executor_base.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from abc import ABC, abstractmethod
  2. from typing import Dict, List, Optional, Tuple
  3. from aphrodite.common.config import (CacheConfig, DeviceConfig, ModelConfig,
  4. ParallelConfig, SchedulerConfig,
  5. LoRAConfig, VisionLanguageConfig,
  6. SpeculativeConfig)
  7. from aphrodite.lora.request import LoRARequest
  8. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  9. class ExecutorBase(ABC):
  10. """Base class for all executors.
  11. An executor is responsible for executing the model on a specific device
  12. type (e.g., CPU, GPU, Neuron, etc.). Or it can be a distributed executor
  13. that can execute the model on multiple devices.
  14. """
  15. @abstractmethod
  16. def __init__(
  17. self,
  18. model_config: ModelConfig,
  19. cache_config: CacheConfig,
  20. parallel_config: ParallelConfig,
  21. scheduler_config: SchedulerConfig,
  22. device_config: DeviceConfig,
  23. lora_config: Optional[LoRAConfig],
  24. vision_language_config: Optional[VisionLanguageConfig],
  25. speculative_config: Optional[SpeculativeConfig],
  26. ) -> None:
  27. raise NotImplementedError
  28. @abstractmethod
  29. def determine_num_available_blocks(self) -> Tuple[int, int]:
  30. """Determine the number of available blocks for the GPU KV cache and
  31. swappable CPU KV cache.
  32. Normally, this should simply delegate to the underlying Worker. Some
  33. ExecutorBase may require modification of the result, e.g. to ensure the
  34. selected cache sizes are compatible with all workers.
  35. Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
  36. are blocks that are "active" on the device and can be appended to.
  37. num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
  38. appended to.
  39. """
  40. raise NotImplementedError
  41. @abstractmethod
  42. def initialize_cache(self, num_gpu_blocks: int,
  43. num_cpu_blocks: int) -> None:
  44. """Initialize the KV cache with the given size in blocks.
  45. """
  46. raise NotImplementedError
  47. @abstractmethod
  48. def execute_model(
  49. self,
  50. seq_group_metadata_list: List[SequenceGroupMetadata],
  51. blocks_to_swap_in: Dict[int, int],
  52. blocks_to_swap_out: Dict[int, int],
  53. blocks_to_copy: Dict[int, List[int]],
  54. ) -> SamplerOutput:
  55. """Executes one model step on the given sequences."""
  56. raise NotImplementedError
  57. @abstractmethod
  58. def add_lora(self, lora_request: LoRARequest) -> bool:
  59. raise NotImplementedError
  60. @abstractmethod
  61. def remove_lora(self, lora_id: int) -> bool:
  62. raise NotImplementedError
  63. @abstractmethod
  64. def list_loras(self) -> List[int]:
  65. raise NotImplementedError
  66. @abstractmethod
  67. def check_health(self) -> None:
  68. """Checks if the executor is healthy. If not, it should raise an
  69. exception."""
  70. raise NotImplementedError
  71. @abstractmethod
  72. def release_mamba_cache(self, requests_id: List[str]) -> None:
  73. raise NotImplementedError
  74. class ExecutorAsyncBase(ExecutorBase):
  75. @abstractmethod
  76. async def execute_model_async(
  77. self,
  78. seq_group_metadata_list: List[SequenceGroupMetadata],
  79. blocks_to_swap_in: Dict[int, int],
  80. blocks_to_swap_out: Dict[int, int],
  81. blocks_to_copy: Dict[int, List[int]],
  82. ) -> SamplerOutput:
  83. """Executes one model step on the given sequences."""
  84. raise NotImplementedError
  85. @abstractmethod
  86. async def check_health_async(self) -> None:
  87. """Checks if the executor is healthy. If not, it should raise an
  88. exception."""
  89. raise NotImplementedError