from abc import abstractmethod from typing import Any, Dict, Optional, Set, Tuple from loguru import logger from aphrodite.executor.executor_base import ExecutorAsyncBase from aphrodite.executor.gpu_executor import GPUExecutor from aphrodite.lora.request import LoRARequest from aphrodite.common.sequence import SamplerOutput class DistributedGPUExecutor(GPUExecutor): """Abstract superclass of multi-GPU executor implementations.""" def determine_num_available_blocks(self) -> Tuple[int, int]: """Determine the number of available KV blocks. This invokes `determine_num_available_blocks` on each worker and takes the min of the results, guaranteeing that the selected cache sizes are compatible with all workers. Returns: - tuple[num_gpu_blocks, num_cpu_blocks] """ # Get the maximum number of blocks that can be allocated on GPU and CPU. num_blocks = self._run_workers("determine_num_available_blocks", ) # Since we use a shared centralized controller, we take the minimum # number of blocks across all workers to make sure all the memory # operators can be applied to all workers. num_gpu_blocks = min(b[0] for b in num_blocks) num_cpu_blocks = min(b[1] for b in num_blocks) return num_gpu_blocks, num_cpu_blocks def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None: """Initialize the KV cache in all workers. """ # NOTE: We log here to avoid multiple logs when number of workers is # greater than one. We could log in the engine, but not all executors # have GPUs. logger.info("# GPU blocks: %d, # CPU blocks: %d", num_gpu_blocks, num_cpu_blocks) logger.info( f"Minimum concurrency: {num_gpu_blocks * self.cache_config.block_size / self.scheduler_config.max_model_len:.2f}x" # noqa: E501 ) self.cache_config.num_gpu_blocks = num_gpu_blocks self.cache_config.num_cpu_blocks = num_cpu_blocks self._run_workers("initialize_cache", num_gpu_blocks=num_gpu_blocks, num_cpu_blocks=num_cpu_blocks) def execute_model(self, *args, **kwargs) -> SamplerOutput: all_outputs = self._run_workers("execute_model", driver_args=args, driver_kwargs=kwargs) # Only the driver worker returns the sampling results. return all_outputs[0] def add_lora(self, lora_request: LoRARequest) -> bool: assert lora_request.lora_int_id > 0, "lora_id must be greater than 0." return self._run_workers( "add_lora", lora_request=lora_request, ) def remove_lora(self, lora_id: int) -> bool: assert lora_id > 0, "lora_id must be greater than 0." return self._run_workers( "remove_lora", lora_id=lora_id, ) def list_loras(self) -> Set[int]: return self._run_workers("list_loras") @abstractmethod def _run_workers( self, method: str, *args, driver_args: Optional[Tuple[Any, ...]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, max_concurrent_workers: Optional[int] = None, **kwargs, ) -> Any: """Runs the given method on all workers.""" raise NotImplementedError class DistributedGPUExecutorAsync(DistributedGPUExecutor, ExecutorAsyncBase): @abstractmethod async def _run_workers_async( self, method: str, *args, driver_args: Optional[Tuple[Any, ...]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, **kwargs, ) -> Any: """Runs the given method on all workers.""" raise NotImplementedError async def execute_model_async(self, *args, **kwargs) -> SamplerOutput: all_outputs = await self._run_workers_async("execute_model", driver_args=args, driver_kwargs=kwargs) # Only the driver worker returns the sampling results. return all_outputs[0]