gpu_executor.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. from typing import Dict, List, Set
  2. from loguru import logger
  3. from aphrodite.lora.request import LoRARequest
  4. from aphrodite.executor.executor_base import ExecutorAsyncBase, ExecutorBase
  5. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  6. from aphrodite.common.utils import (
  7. get_ip,
  8. get_open_port,
  9. get_distributed_init_method,
  10. make_async,
  11. )
  12. class GPUExecutor(ExecutorBase):
  13. def _init_executor(self) -> None:
  14. """Initialize the worker and load the model.
  15. If speculative decoding is enabled, we instead create the speculative
  16. worker.
  17. """
  18. if self.speculative_config is None:
  19. self._init_non_spec_worker()
  20. else:
  21. self._init_spec_worker()
  22. def _init_non_spec_worker(self):
  23. # Lazy import the Worker to avoid importing torch.cuda/xformers
  24. # before CUDA_VISIBLE_DEVICES is set in the Worker
  25. from aphrodite.task_handler.worker import Worker
  26. assert (self.parallel_config.world_size == 1
  27. ), "GPUExecutor only supports single GPU."
  28. distributed_init_method = get_distributed_init_method(
  29. get_ip(), get_open_port())
  30. self.driver_worker = Worker(
  31. model_config=self.model_config,
  32. parallel_config=self.parallel_config,
  33. scheduler_config=self.scheduler_config,
  34. device_config=self.device_config,
  35. cache_config=self.cache_config,
  36. local_rank=0,
  37. rank=0,
  38. distributed_init_method=distributed_init_method,
  39. lora_config=self.lora_config,
  40. vision_language_config=self.vision_language_config,
  41. is_driver_worker=True,
  42. )
  43. self.driver_worker.init_device()
  44. self.driver_worker.load_model()
  45. def _init_spec_worker(self):
  46. """Initialize a SpecDecodeWorker, using a draft model for proposals.
  47. """
  48. assert self.speculative_config is not None
  49. from aphrodite.spec_decode.spec_decode_worker import SpecDecodeWorker
  50. from aphrodite.task_handler.worker import Worker
  51. distributed_init_method = get_distributed_init_method(
  52. get_ip(), get_open_port())
  53. target_worker = Worker(
  54. model_config=self.model_config,
  55. parallel_config=self.parallel_config,
  56. scheduler_config=self.scheduler_config,
  57. device_config=self.device_config,
  58. cache_config=self.cache_config,
  59. local_rank=0,
  60. rank=0,
  61. distributed_init_method=distributed_init_method,
  62. lora_config=self.lora_config,
  63. vision_language_config=self.vision_language_config,
  64. is_driver_worker=True,
  65. )
  66. spec_decode_worker = SpecDecodeWorker.create_worker(
  67. scorer_worker=target_worker,
  68. speculative_config=self.speculative_config,
  69. )
  70. assert self.parallel_config.world_size == 1, (
  71. "GPUExecutor only supports single GPU.")
  72. self.driver_worker = spec_decode_worker
  73. # Load model handled in spec decode worker.
  74. self.driver_worker.init_device()
  75. def determine_num_available_blocks(self) -> tuple[int, int]:
  76. """Determine the number of available KV blocks by invoking the
  77. underlying worker.
  78. """
  79. return self.driver_worker.determine_num_available_blocks()
  80. def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None:
  81. """Initialize the KV cache by invoking the underlying worker.
  82. """
  83. # NOTE: This is logged in the executor because there can be >1 worker
  84. # with other executors. We could log in the engine level, but work
  85. # remains to abstract away the device for non-GPU configurations.
  86. logger.info(f"# GPU blocks: {num_gpu_blocks}, "
  87. f"# CPU blocks: {num_cpu_blocks}")
  88. logger.info(
  89. f"Minimum concurrency: {num_gpu_blocks * self.cache_config.block_size / self.scheduler_config.max_model_len:.2f}x" # noqa: E501
  90. )
  91. self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
  92. def execute_model(
  93. self,
  94. seq_group_metadata_list: List[SequenceGroupMetadata],
  95. blocks_to_swap_in: Dict[int, int],
  96. blocks_to_swap_out: Dict[int, int],
  97. blocks_to_copy: Dict[int, List[int]],
  98. num_lookahead_slots: int,
  99. ) -> List[SamplerOutput]:
  100. output = self.driver_worker.execute_model(
  101. seq_group_metadata_list=seq_group_metadata_list,
  102. blocks_to_swap_in=blocks_to_swap_in,
  103. blocks_to_swap_out=blocks_to_swap_out,
  104. blocks_to_copy=blocks_to_copy,
  105. num_lookahead_slots=num_lookahead_slots,
  106. )
  107. return output
  108. def add_lora(self, lora_request: LoRARequest) -> bool:
  109. assert lora_request.lora_int_id > 0, "lora_id must be greater than 0."
  110. return self.driver_worker.add_lora(lora_request)
  111. def remove_lora(self, lora_id: int) -> bool:
  112. assert lora_id > 0, "lora_id must be greater than 0."
  113. return self.driver_worker.remove_lora(lora_id)
  114. def list_loras(self) -> Set[int]:
  115. return self.driver_worker.list_loras()
  116. def check_health(self) -> None:
  117. # GPUExecutor will always be healthy as long as
  118. # it's running.
  119. return
  120. class GPUExecutorAsync(GPUExecutor, ExecutorAsyncBase):
  121. async def execute_model_async(
  122. self,
  123. seq_group_metadata_list: List[SequenceGroupMetadata],
  124. blocks_to_swap_in: Dict[int, int],
  125. blocks_to_swap_out: Dict[int, int],
  126. blocks_to_copy: Dict[int, List[int]],
  127. num_lookahead_slots: int,
  128. ) -> SamplerOutput:
  129. output = await make_async(self.driver_worker.execute_model)(
  130. seq_group_metadata_list=seq_group_metadata_list,
  131. blocks_to_swap_in=blocks_to_swap_in,
  132. blocks_to_swap_out=blocks_to_swap_out,
  133. blocks_to_copy=blocks_to_copy,
  134. num_lookahead_slots=num_lookahead_slots,
  135. )
  136. return output