1
0

gpu_executor.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. from typing import Dict, List, Set, Tuple
  2. from loguru import logger
  3. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  4. from aphrodite.common.utils import (get_distributed_init_method, get_ip,
  5. get_open_port, make_async)
  6. from aphrodite.executor.executor_base import ExecutorAsyncBase, ExecutorBase
  7. from aphrodite.lora.request import LoRARequest
  8. class GPUExecutor(ExecutorBase):
  9. def _init_executor(self) -> None:
  10. """Initialize the worker and load the model.
  11. If speculative decoding is enabled, we instead create the speculative
  12. worker.
  13. """
  14. if self.speculative_config is None:
  15. self._init_non_spec_worker()
  16. else:
  17. self._init_spec_worker()
  18. def _init_non_spec_worker(self):
  19. # Lazy import the Worker to avoid importing torch.cuda/xformers
  20. # before CUDA_VISIBLE_DEVICES is set in the Worker
  21. from aphrodite.task_handler.worker import Worker
  22. assert self.parallel_config.world_size == 1, (
  23. "GPUExecutor only supports single GPU.")
  24. distributed_init_method = get_distributed_init_method(
  25. get_ip(), get_open_port())
  26. self.driver_worker = Worker(
  27. model_config=self.model_config,
  28. parallel_config=self.parallel_config,
  29. scheduler_config=self.scheduler_config,
  30. device_config=self.device_config,
  31. cache_config=self.cache_config,
  32. load_config=self.load_config,
  33. local_rank=0,
  34. rank=0,
  35. distributed_init_method=distributed_init_method,
  36. lora_config=self.lora_config,
  37. vision_language_config=self.vision_language_config,
  38. is_driver_worker=True,
  39. )
  40. self.driver_worker.init_device()
  41. self.driver_worker.load_model()
  42. def _init_spec_worker(self):
  43. """Initialize a SpecDecodeWorker, using a draft model for proposals.
  44. """
  45. assert self.speculative_config is not None
  46. from aphrodite.spec_decode.multi_step_worker import MultiStepWorker
  47. from aphrodite.spec_decode.spec_decode_worker import SpecDecodeWorker
  48. from aphrodite.task_handler.worker import Worker
  49. distributed_init_method = get_distributed_init_method(
  50. get_ip(), get_open_port())
  51. target_worker = Worker(
  52. model_config=self.model_config,
  53. parallel_config=self.parallel_config,
  54. scheduler_config=self.scheduler_config,
  55. device_config=self.device_config,
  56. cache_config=self.cache_config,
  57. load_config=self.load_config,
  58. local_rank=0,
  59. rank=0,
  60. distributed_init_method=distributed_init_method,
  61. lora_config=self.lora_config,
  62. vision_language_config=self.vision_language_config,
  63. is_driver_worker=True,
  64. )
  65. draft_worker = MultiStepWorker(
  66. model_config=self.speculative_config.draft_model_config,
  67. parallel_config=self.speculative_config.draft_parallel_config,
  68. scheduler_config=self.scheduler_config,
  69. device_config=self.device_config,
  70. cache_config=self.cache_config,
  71. # TODO allow draft-model specific load config.
  72. load_config=self.load_config,
  73. local_rank=0,
  74. rank=0,
  75. distributed_init_method=distributed_init_method,
  76. lora_config=self.lora_config,
  77. vision_language_config=self.vision_language_config,
  78. is_driver_worker=True,
  79. )
  80. spec_decode_worker = SpecDecodeWorker.from_workers(
  81. proposer_worker=draft_worker, scorer_worker=target_worker)
  82. assert self.parallel_config.world_size == 1, (
  83. "GPUExecutor only supports single GPU.")
  84. self.driver_worker = spec_decode_worker
  85. # Load model handled in spec decode worker.
  86. self.driver_worker.init_device()
  87. def determine_num_available_blocks(self) -> Tuple[int, int]:
  88. """Determine the number of available KV blocks by invoking the
  89. underlying worker.
  90. """
  91. return self.driver_worker.determine_num_available_blocks()
  92. def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None:
  93. """Initialize the KV cache by invoking the underlying worker.
  94. """
  95. # NOTE: This is logged in the executor because there can be >1 worker
  96. # with other executors. We could log in the engine level, but work
  97. # remains to abstract away the device for non-GPU configurations.
  98. logger.info(f"# GPU blocks: {num_gpu_blocks}, "
  99. f"# CPU blocks: {num_cpu_blocks}")
  100. logger.info(
  101. f"Minimum concurrency: {num_gpu_blocks * self.cache_config.block_size / self.scheduler_config.max_model_len:.2f}x" # noqa: E501
  102. )
  103. self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
  104. def execute_model(
  105. self,
  106. seq_group_metadata_list: List[SequenceGroupMetadata],
  107. blocks_to_swap_in: Dict[int, int],
  108. blocks_to_swap_out: Dict[int, int],
  109. blocks_to_copy: Dict[int, List[int]],
  110. num_lookahead_slots: int,
  111. ) -> List[SamplerOutput]:
  112. output = self.driver_worker.execute_model(
  113. seq_group_metadata_list=seq_group_metadata_list,
  114. blocks_to_swap_in=blocks_to_swap_in,
  115. blocks_to_swap_out=blocks_to_swap_out,
  116. blocks_to_copy=blocks_to_copy,
  117. num_lookahead_slots=num_lookahead_slots,
  118. )
  119. return output
  120. def add_lora(self, lora_request: LoRARequest) -> bool:
  121. assert lora_request.lora_int_id > 0, "lora_id must be greater than 0."
  122. return self.driver_worker.add_lora(lora_request)
  123. def remove_lora(self, lora_id: int) -> bool:
  124. assert lora_id > 0, "lora_id must be greater than 0."
  125. return self.driver_worker.remove_lora(lora_id)
  126. def list_loras(self) -> Set[int]:
  127. return self.driver_worker.list_loras()
  128. def check_health(self) -> None:
  129. # GPUExecutor will always be healthy as long as
  130. # it's running.
  131. return
  132. class GPUExecutorAsync(GPUExecutor, ExecutorAsyncBase):
  133. async def execute_model_async(
  134. self,
  135. seq_group_metadata_list: List[SequenceGroupMetadata],
  136. blocks_to_swap_in: Dict[int, int],
  137. blocks_to_swap_out: Dict[int, int],
  138. blocks_to_copy: Dict[int, List[int]],
  139. num_lookahead_slots: int,
  140. ) -> SamplerOutput:
  141. output = await make_async(self.driver_worker.execute_model)(
  142. seq_group_metadata_list=seq_group_metadata_list,
  143. blocks_to_swap_in=blocks_to_swap_in,
  144. blocks_to_swap_out=blocks_to_swap_out,
  145. blocks_to_copy=blocks_to_copy,
  146. num_lookahead_slots=num_lookahead_slots)
  147. return output