neuron_executor.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. from typing import Dict, List, Optional
  2. from aphrodite.lora.request import LoRARequest
  3. from aphrodite.common.config import (CacheConfig, DeviceConfig, ModelConfig,
  4. ParallelConfig, SchedulerConfig,
  5. LoRAConfig, VisionLanguageConfig,
  6. SpeculativeConfig)
  7. from aphrodite.executor.executor_base import ExecutorBase
  8. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  9. class NeuronExecutor(ExecutorBase):
  10. def __init__(
  11. self,
  12. model_config: ModelConfig,
  13. cache_config: CacheConfig,
  14. parallel_config: ParallelConfig,
  15. scheduler_config: SchedulerConfig,
  16. device_config: DeviceConfig,
  17. lora_config: Optional[LoRAConfig],
  18. vision_language_config: Optional[VisionLanguageConfig],
  19. speculative_config: Optional[SpeculativeConfig],
  20. ) -> None:
  21. self.model_config = model_config
  22. self.cache_config = cache_config
  23. assert lora_config is None, "LoRA is not supported for Neuron backend."
  24. self.parallel_config = parallel_config
  25. self.scheduler_config = scheduler_config
  26. self.device_config = device_config
  27. assert (not speculative_config
  28. ), "Speculative decoding not yet supported for Neuron backend."
  29. # Instantiate the worker and load the model to the device.
  30. self._init_worker()
  31. def _init_worker(self):
  32. from aphrodite.task_handler.neuron_worker import NeuronWorker
  33. self.driver_worker = NeuronWorker(
  34. self.model_config,
  35. self.parallel_config,
  36. self.scheduler_config,
  37. self.device_config,
  38. self.cache_config,
  39. )
  40. self.driver_worker.init_device()
  41. self.driver_worker.load_model()
  42. def determine_num_available_blocks(self) -> tuple[int, int]:
  43. """Determine the number of available KV blocks by invoking the
  44. underlying worker.
  45. """
  46. return self.driver_worker.determine_num_available_blocks()
  47. def initialize_cache(self, num_gpu_blocks: int,
  48. num_cpu_blocks: int) -> None:
  49. """Initialize the KV cache by invoking the underlying worker.
  50. """
  51. self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
  52. def execute_model(self,
  53. seq_group_metadata_list: List[SequenceGroupMetadata],
  54. blocks_to_swap_in: Dict[int, int],
  55. blocks_to_swap_out: Dict[int, int],
  56. blocks_to_copy: Dict[int, List[int]]) -> SamplerOutput:
  57. assert (blocks_to_swap_in == {} and blocks_to_swap_out == {}
  58. and blocks_to_copy == {}), (
  59. "Cache operations are not supported for Neuron backend.")
  60. output = self.driver_worker.execute_model(
  61. seq_group_metadata_list=seq_group_metadata_list)
  62. return output
  63. def add_lora(self, lora_request: LoRARequest) -> bool:
  64. return self.driver_worker.add_lora(lora_request)
  65. def remove_lora(self, lora_id: int) -> bool:
  66. return self.driver_worker.remove_lora(lora_id)
  67. def list_loras(self) -> List[int]:
  68. return self.driver_worker.list_loras()
  69. def check_health(self) -> None:
  70. # NeuronExecutor will always be healthy as long as
  71. # it's running.
  72. return