neuron_worker.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. """A Neuron worker class."""
  2. from typing import List
  3. import torch
  4. import torch.distributed
  5. from aphrodite.common.config import (
  6. CacheConfig,
  7. DeviceConfig,
  8. ModelConfig,
  9. ParallelConfig,
  10. SchedulerConfig,
  11. )
  12. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  13. from aphrodite.modeling import set_random_seed
  14. from aphrodite.task_handler.neuron_model_runner import NeuronModelRunner
  15. from aphrodite.task_handler.worker_base import LoraNotSupportedWorkerBase
  16. class NeuronWorker(LoraNotSupportedWorkerBase):
  17. """A worker class that executes the model on a group of neuron cores."""
  18. def __init__(
  19. self,
  20. model_config: ModelConfig,
  21. parallel_config: ParallelConfig,
  22. scheduler_config: SchedulerConfig,
  23. device_config: DeviceConfig,
  24. cache_config: CacheConfig,
  25. ) -> None:
  26. self.model_config = model_config
  27. self.parallel_config = parallel_config
  28. self.scheduler_config = scheduler_config
  29. self.device_config = device_config
  30. self.cache_config = cache_config
  31. if self.model_config.trust_remote_code:
  32. from aphrodite.common.utils import init_cached_hf_modules
  33. init_cached_hf_modules()
  34. self.model_runner = NeuronModelRunner(model_config, parallel_config,
  35. scheduler_config, device_config)
  36. def init_device(self) -> None:
  37. # Set random seed.
  38. set_random_seed(self.model_config.seed)
  39. def load_model(self):
  40. self.model_runner.load_model()
  41. def determine_num_available_blocks(self) -> tuple[int, int]:
  42. """Determine the number of available KV blocks.
  43. Swapping is not yet supported, so always return num_cpu_blocks=0.
  44. We configure num_gpu_blocks to be equal to max_num_seqs.
  45. """
  46. # Set the number of GPU blocks to be the same as the maximum number of
  47. # sequences that can be processed in a single batch. This is equivalent
  48. # to schedule without PagedAttention.
  49. num_gpu_blocks = self.scheduler_config.max_num_seqs
  50. # Swap not yet supported with Neuron backend.
  51. num_cpu_blocks = 0
  52. return num_gpu_blocks, num_cpu_blocks
  53. def initialize_cache(self, num_gpu_blocks: int,
  54. num_cpu_blocks: int) -> None:
  55. """Initialize the KV cache.
  56. """
  57. # Different values are not tested.
  58. assert num_cpu_blocks == 0
  59. assert num_gpu_blocks == self.scheduler_config.max_num_seqs
  60. self.cache_config.num_gpu_blocks = num_gpu_blocks
  61. self.cache_config.num_cpu_blocks = num_cpu_blocks
  62. @torch.inference_mode()
  63. def execute_model(
  64. self,
  65. seq_group_metadata_list: List[SequenceGroupMetadata],
  66. ) -> List[SamplerOutput]:
  67. num_seq_groups = len(seq_group_metadata_list)
  68. # If there is no input, we don't need to execute the model.
  69. if num_seq_groups == 0:
  70. return []
  71. output = self.model_runner.execute_model(seq_group_metadata_list)
  72. # Neuron worker only supports single-step output. Wrap the output in a
  73. # list to conform to interface.
  74. return [output]
  75. def get_cache_block_size_bytes(self) -> int:
  76. """Determine the size in bytes of a cache block.
  77. This is required for speculative decoding; it is not yet implemented.
  78. """
  79. raise NotImplementedError