neuron_worker.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. """A Neuron worker class."""
  2. from typing import List, Optional, Tuple
  3. import torch
  4. import torch.distributed
  5. from aphrodite.common.config import (CacheConfig, DeviceConfig, ModelConfig,
  6. ParallelConfig, SchedulerConfig)
  7. from aphrodite.common.sequence import ExecuteModelRequest
  8. from aphrodite.modeling import set_random_seed
  9. from aphrodite.task_handler.neuron_model_runner import NeuronModelRunner
  10. from aphrodite.task_handler.worker_base import (LocalOrDistributedWorkerBase,
  11. LoraNotSupportedWorkerBase,
  12. WorkerInput)
  13. class NeuronWorker(LoraNotSupportedWorkerBase, LocalOrDistributedWorkerBase):
  14. """A worker class that executes the model on a group of neuron cores.
  15. """
  16. def __init__(
  17. self,
  18. model_config: ModelConfig,
  19. parallel_config: ParallelConfig,
  20. scheduler_config: SchedulerConfig,
  21. device_config: DeviceConfig,
  22. cache_config: CacheConfig,
  23. ) -> None:
  24. self.model_config = model_config
  25. self.parallel_config = parallel_config
  26. self.scheduler_config = scheduler_config
  27. self.device_config = device_config
  28. self.cache_config = cache_config
  29. if self.model_config.trust_remote_code:
  30. # note: lazy import to avoid importing torch before initializing
  31. from aphrodite.common.utils import init_cached_hf_modules
  32. init_cached_hf_modules()
  33. self.model_runner: NeuronModelRunner = NeuronModelRunner(
  34. model_config, parallel_config, scheduler_config, device_config)
  35. self.is_driver_worker = True
  36. def init_device(self) -> None:
  37. # Set random seed.
  38. set_random_seed(self.model_config.seed)
  39. def load_model(self):
  40. self.model_runner.load_model()
  41. def determine_num_available_blocks(self) -> Tuple[int, int]:
  42. """Determine the number of available KV blocks.
  43. Swapping is not yet supported, so always return num_cpu_blocks=0.
  44. We configure num_gpu_blocks to be equal to max_num_seqs.
  45. """
  46. # Set the number of GPU blocks to be the same as the maximum number of
  47. # sequences that can be processed in a single batch. This is equivalent
  48. # to schedule without PagedAttention.
  49. num_gpu_blocks = self.scheduler_config.max_num_seqs
  50. # Swap not yet supported with Neuron backend.
  51. num_cpu_blocks = 0
  52. return num_gpu_blocks, num_cpu_blocks
  53. def initialize_cache(self, num_gpu_blocks: int,
  54. num_cpu_blocks: int) -> None:
  55. """Initialize the KV cache.
  56. """
  57. # Different values are not tested.
  58. assert num_cpu_blocks == 0
  59. assert num_gpu_blocks == self.scheduler_config.max_num_seqs
  60. self.cache_config.num_gpu_blocks = num_gpu_blocks
  61. self.cache_config.num_cpu_blocks = num_cpu_blocks
  62. @property
  63. def do_metadata_broadcast(self) -> bool:
  64. return False
  65. @property
  66. def kv_cache(self) -> Optional[List[List[torch.Tensor]]]:
  67. return None
  68. @torch.inference_mode()
  69. def prepare_worker_input(
  70. self, execute_model_req: ExecuteModelRequest) -> WorkerInput:
  71. return WorkerInput(num_seq_groups=len(
  72. execute_model_req.seq_group_metadata_list), )
  73. def execute_worker(self, worker_input: WorkerInput) -> None:
  74. pass
  75. def get_cache_block_size_bytes(self) -> int:
  76. """Determine the size in bytes of a cache block.
  77. This is required for speculative decoding; it is not yet implemented.
  78. """
  79. raise NotImplementedError