1
0

neuron_worker.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. """A Neuron worker class."""
  2. from typing import List, Tuple
  3. import torch
  4. import torch.distributed
  5. from aphrodite.common.config import (CacheConfig, DeviceConfig, ModelConfig,
  6. ParallelConfig, SchedulerConfig)
  7. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  8. from aphrodite.modeling import set_random_seed
  9. from aphrodite.task_handler.neuron_model_runner import NeuronModelRunner
  10. from aphrodite.task_handler.worker_base import LoraNotSupportedWorkerBase
  11. class NeuronWorker(LoraNotSupportedWorkerBase):
  12. """A worker class that executes the model on a group of neuron cores.
  13. """
  14. def __init__(
  15. self,
  16. model_config: ModelConfig,
  17. parallel_config: ParallelConfig,
  18. scheduler_config: SchedulerConfig,
  19. device_config: DeviceConfig,
  20. cache_config: CacheConfig,
  21. ) -> None:
  22. self.model_config = model_config
  23. self.parallel_config = parallel_config
  24. self.scheduler_config = scheduler_config
  25. self.device_config = device_config
  26. self.cache_config = cache_config
  27. if self.model_config.trust_remote_code:
  28. # note: lazy import to avoid importing torch before initializing
  29. from aphrodite.common.utils import init_cached_hf_modules
  30. init_cached_hf_modules()
  31. self.model_runner = NeuronModelRunner(model_config, parallel_config,
  32. scheduler_config, device_config)
  33. def init_device(self) -> None:
  34. # Set random seed.
  35. set_random_seed(self.model_config.seed)
  36. def load_model(self):
  37. self.model_runner.load_model()
  38. def determine_num_available_blocks(self) -> Tuple[int, int]:
  39. """Determine the number of available KV blocks.
  40. Swapping is not yet supported, so always return num_cpu_blocks=0.
  41. We configure num_gpu_blocks to be equal to max_num_seqs.
  42. """
  43. # Set the number of GPU blocks to be the same as the maximum number of
  44. # sequences that can be processed in a single batch. This is equivalent
  45. # to schedule without PagedAttention.
  46. num_gpu_blocks = self.scheduler_config.max_num_seqs
  47. # Swap not yet supported with Neuron backend.
  48. num_cpu_blocks = 0
  49. return num_gpu_blocks, num_cpu_blocks
  50. def initialize_cache(self, num_gpu_blocks: int,
  51. num_cpu_blocks: int) -> None:
  52. """Initialize the KV cache.
  53. """
  54. # Different values are not tested.
  55. assert num_cpu_blocks == 0
  56. assert num_gpu_blocks == self.scheduler_config.max_num_seqs
  57. self.cache_config.num_gpu_blocks = num_gpu_blocks
  58. self.cache_config.num_cpu_blocks = num_cpu_blocks
  59. @torch.inference_mode()
  60. def execute_model(
  61. self,
  62. seq_group_metadata_list: List[SequenceGroupMetadata],
  63. ) -> List[SamplerOutput]:
  64. num_seq_groups = len(seq_group_metadata_list)
  65. # If there is no input, we don't need to execute the model.
  66. if num_seq_groups == 0:
  67. return []
  68. output = self.model_runner.execute_model(seq_group_metadata_list)
  69. # Neuron worker only supports single-step output. Wrap the output in a
  70. # list to conform to interface.
  71. return [output]
  72. def get_cache_block_size_bytes(self) -> int:
  73. """Determine the size in bytes of a cache block.
  74. This is required for speculative decoding; it is not yet implemented.
  75. """
  76. raise NotImplementedError