neuron_executor.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. from typing import Dict, List, Set, Tuple
  2. from aphrodite.executor.executor_base import ExecutorAsyncBase, ExecutorBase
  3. from aphrodite.lora.request import LoRARequest
  4. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  5. from aphrodite.common.utils import make_async
  6. class NeuronExecutor(ExecutorBase):
  7. def _init_executor(self) -> None:
  8. assert (self.lora_config is
  9. None), "LoRA is not supported for Neuron backend."
  10. assert (not self.speculative_config
  11. ), "Speculative decoding not yet supported for Neuron backend."
  12. # Instantiate the worker and load the model to the device.
  13. self._init_worker()
  14. def _init_worker(self):
  15. from aphrodite.task_handler.neuron_worker import NeuronWorker
  16. self.driver_worker = NeuronWorker(
  17. self.model_config,
  18. self.parallel_config,
  19. self.scheduler_config,
  20. self.device_config,
  21. self.cache_config,
  22. )
  23. self.driver_worker.init_device()
  24. self.driver_worker.load_model()
  25. def determine_num_available_blocks(self) -> Tuple[int, int]:
  26. """Determine the number of available KV blocks by invoking the
  27. underlying worker.
  28. """
  29. return self.driver_worker.determine_num_available_blocks()
  30. def initialize_cache(self, num_gpu_blocks: int,
  31. num_cpu_blocks: int) -> None:
  32. """Initialize the KV cache by invoking the underlying worker.
  33. """
  34. self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
  35. def execute_model(self,
  36. seq_group_metadata_list: List[SequenceGroupMetadata],
  37. blocks_to_swap_in: Dict[int, int],
  38. blocks_to_swap_out: Dict[int, int],
  39. blocks_to_copy: Dict[int, List[int]],
  40. num_lookahead_slots: int) -> List[SamplerOutput]:
  41. assert (blocks_to_swap_in == {} and blocks_to_swap_out == {}
  42. and blocks_to_copy == {}), (
  43. "Cache operations are not supported for Neuron backend.")
  44. assert num_lookahead_slots == 0, (
  45. "lookahead not supported for Neuron backend.")
  46. output = self.driver_worker.execute_model(
  47. seq_group_metadata_list=seq_group_metadata_list)
  48. return output
  49. def add_lora(self, lora_request: LoRARequest) -> bool:
  50. return self.driver_worker.add_lora(lora_request)
  51. def remove_lora(self, lora_id: int) -> bool:
  52. return self.driver_worker.remove_lora(lora_id)
  53. def list_loras(self) -> Set[int]:
  54. return self.driver_worker.list_loras()
  55. def check_health(self) -> None:
  56. # NeuronExecutor will always be healthy as long as
  57. # it's running.
  58. return
  59. class NeuronExecutorAsync(NeuronExecutor, ExecutorAsyncBase):
  60. async def execute_model_async(
  61. self,
  62. seq_group_metadata_list: List[SequenceGroupMetadata],
  63. blocks_to_swap_in: Dict[int, int],
  64. blocks_to_swap_out: Dict[int, int],
  65. blocks_to_copy: Dict[int, List[int]],
  66. ) -> SamplerOutput:
  67. output = await make_async(self.driver_worker.execute_model)(
  68. seq_group_metadata_list=seq_group_metadata_list, )
  69. return output
  70. async def check_health_async(self) -> None:
  71. # NeuronExecutor will always be healthy as long as
  72. # it's running.
  73. return