gpu_executor.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. from typing import Dict, List, Optional
  2. from loguru import logger
  3. from aphrodite.lora.request import LoRARequest
  4. from aphrodite.common.config import (CacheConfig, DeviceConfig, ModelConfig,
  5. ParallelConfig, SchedulerConfig,
  6. LoRAConfig, VisionLanguageConfig,
  7. SpeculativeConfig)
  8. from aphrodite.executor.executor_base import ExecutorAsyncBase, ExecutorBase
  9. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  10. from aphrodite.common.utils import (
  11. get_ip,
  12. get_open_port,
  13. get_distributed_init_method,
  14. make_async,
  15. )
  16. class GPUExecutor(ExecutorBase):
  17. def __init__(
  18. self,
  19. model_config: ModelConfig,
  20. cache_config: CacheConfig,
  21. parallel_config: ParallelConfig,
  22. scheduler_config: SchedulerConfig,
  23. device_config: DeviceConfig,
  24. lora_config: Optional[LoRAConfig],
  25. vision_language_config: Optional[VisionLanguageConfig],
  26. speculative_config: Optional[SpeculativeConfig],
  27. ) -> None:
  28. self.model_config = model_config
  29. self.cache_config = cache_config
  30. self.lora_config = lora_config
  31. self.parallel_config = parallel_config
  32. self.scheduler_config = scheduler_config
  33. self.device_config = device_config
  34. self.vision_language_config = vision_language_config
  35. assert (not speculative_config
  36. ), "Speculative decoding not yet supported for GPU backend"
  37. # Instantiate the worker and load the model to GPU.
  38. self._init_worker()
  39. def _init_worker(self):
  40. # Lazy import the Worker to avoid importing torch.cuda/xformers
  41. # before CUDA_VISIBLE_DEVICES is set in the Worker
  42. from aphrodite.task_handler.worker import Worker
  43. assert (self.parallel_config.world_size == 1
  44. ), "GPUExecutor only supports single GPU."
  45. distributed_init_method = get_distributed_init_method(
  46. get_ip(), get_open_port())
  47. self.driver_worker = Worker(
  48. model_config=self.model_config,
  49. parallel_config=self.parallel_config,
  50. scheduler_config=self.scheduler_config,
  51. device_config=self.device_config,
  52. cache_config=self.cache_config,
  53. local_rank=0,
  54. rank=0,
  55. distributed_init_method=distributed_init_method,
  56. lora_config=self.lora_config,
  57. vision_language_config=self.vision_language_config,
  58. is_driver_worker=True,
  59. )
  60. self.driver_worker.init_device()
  61. self.driver_worker.load_model()
  62. def determine_num_available_blocks(self) -> tuple[int, int]:
  63. """Determine the number of available KV blocks by invoking the
  64. underlying worker.
  65. """
  66. return self.driver_worker.determine_num_available_blocks()
  67. def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None:
  68. """Initialize the KV cache by invoking the underlying worker.
  69. """
  70. # NOTE: This is logged in the executor because there can be >1 worker
  71. # with other executors. We could log in the engine level, but work
  72. # remains to abstract away the device for non-GPU configurations.
  73. logger.info(f"# GPU blocks: {num_gpu_blocks}, "
  74. f"# CPU blocks: {num_cpu_blocks}")
  75. logger.info(
  76. f"Minimum concurrency: {num_gpu_blocks * self.cache_config.block_size / self.scheduler_config.max_model_len:.2f}x" # noqa: E501
  77. )
  78. self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
  79. def execute_model(
  80. self,
  81. seq_group_metadata_list: List[SequenceGroupMetadata],
  82. blocks_to_swap_in: Dict[int, int],
  83. blocks_to_swap_out: Dict[int, int],
  84. blocks_to_copy: Dict[int, List[int]],
  85. ) -> SamplerOutput:
  86. output = self.driver_worker.execute_model(
  87. seq_group_metadata_list=seq_group_metadata_list,
  88. blocks_to_swap_in=blocks_to_swap_in,
  89. blocks_to_swap_out=blocks_to_swap_out,
  90. blocks_to_copy=blocks_to_copy,
  91. )
  92. return output
  93. def add_lora(self, lora_request: LoRARequest) -> bool:
  94. assert lora_request.lora_int_id > 0, "lora_id must be greater than 0."
  95. return self.driver_worker.add_lora(lora_request)
  96. def remove_lora(self, lora_id: int) -> bool:
  97. assert lora_id > 0, "lora_id must be greater than 0."
  98. return self.driver_worker.remove_lora(lora_id)
  99. def list_loras(self) -> List[int]:
  100. return self.driver_worker.list_loras()
  101. def check_health(self) -> None:
  102. # GPUExecutor will always be healthy as long as
  103. # it's running.
  104. return
  105. class GPUExecutorAsync(GPUExecutor, ExecutorAsyncBase):
  106. async def execute_model_async(
  107. self,
  108. seq_group_metadata_list: List[SequenceGroupMetadata],
  109. blocks_to_swap_in: Dict[int, int],
  110. blocks_to_swap_out: Dict[int, int],
  111. blocks_to_copy: Dict[int, List[int]],
  112. ) -> SamplerOutput:
  113. output = await make_async(self.driver_worker.execute_model)(
  114. seq_group_metadata_list=seq_group_metadata_list,
  115. blocks_to_swap_in=blocks_to_swap_in,
  116. blocks_to_swap_out=blocks_to_swap_out,
  117. blocks_to_copy=blocks_to_copy,
  118. )
  119. return output
  120. async def check_health_async(self) -> None:
  121. # GPUExecutor will always be healthy as long as
  122. # it's running.
  123. return