neuron_worker.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. """A Neuron worker class."""
  2. from typing import List
  3. import torch
  4. import torch.distributed
  5. from aphrodite.common.config import (
  6. CacheConfig,
  7. DeviceConfig,
  8. ModelConfig,
  9. ParallelConfig,
  10. SchedulerConfig,
  11. )
  12. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  13. from aphrodite.modeling import set_random_seed
  14. from aphrodite.task_handler.neuron_model_runner import NeuronModelRunner
  15. from aphrodite.task_handler.worker_base import LoraNotSupportedWorkerBase
  16. class NeuronWorker(LoraNotSupportedWorkerBase):
  17. """A worker class that executes the model on a group of neuron cores."""
  18. def __init__(
  19. self,
  20. model_config: ModelConfig,
  21. parallel_config: ParallelConfig,
  22. scheduler_config: SchedulerConfig,
  23. device_config: DeviceConfig,
  24. cache_config: CacheConfig,
  25. ) -> None:
  26. self.model_config = model_config
  27. self.parallel_config = parallel_config
  28. self.scheduler_config = scheduler_config
  29. self.device_config = device_config
  30. self.cache_config = cache_config
  31. self.model_runner = NeuronModelRunner(model_config, parallel_config,
  32. scheduler_config, device_config)
  33. def init_device(self) -> None:
  34. # Set random seed.
  35. set_random_seed(self.model_config.seed)
  36. def load_model(self):
  37. self.model_runner.load_model()
  38. def determine_num_available_blocks(self) -> tuple[int, int]:
  39. """Determine the number of available KV blocks.
  40. Swapping is not yet supported, so always return num_cpu_blocks=0.
  41. We configure num_gpu_blocks to be equal to max_num_seqs.
  42. """
  43. # Set the number of GPU blocks to be the same as the maximum number of
  44. # sequences that can be processed in a single batch. This is equivalent
  45. # to schedule without PagedAttention.
  46. num_gpu_blocks = self.scheduler_config.max_num_seqs
  47. # Swap not yet supported with Neuron backend.
  48. num_cpu_blocks = 0
  49. return num_gpu_blocks, num_cpu_blocks
  50. def initialize_cache(self, num_gpu_blocks: int,
  51. num_cpu_blocks: int) -> None:
  52. """Initialize the KV cache.
  53. """
  54. # Different values are not tested.
  55. assert num_cpu_blocks == 0
  56. assert num_gpu_blocks == self.scheduler_config.max_num_seqs
  57. self.cache_config.num_gpu_blocks = num_gpu_blocks
  58. self.cache_config.num_cpu_blocks = num_cpu_blocks
  59. @torch.inference_mode()
  60. def execute_model(
  61. self,
  62. seq_group_metadata_list: List[SequenceGroupMetadata],
  63. ) -> List[SamplerOutput]:
  64. num_seq_groups = len(seq_group_metadata_list)
  65. # If there is no input, we don't need to execute the model.
  66. if num_seq_groups == 0:
  67. return []
  68. output = self.model_runner.execute_model(seq_group_metadata_list)
  69. # Neuron worker only supports single-step output. Wrap the output in a
  70. # list to conform to interface.
  71. return [output]
  72. def get_cache_block_size_bytes(self) -> int:
  73. """Determine the size in bytes of a cache block.
  74. This is required for speculative decoding; it is not yet implemented.
  75. """
  76. raise NotImplementedError