소스 검색

feat: `DistributedGPUExecutor` abstract class (#541)

AlpinDale 8 달 전
부모
커밋
c21af7acad
1개의 변경된 파일8개의 추가작업 그리고 94개의 파일을 삭제
  1. 8 94
      aphrodite/executor/ray_gpu_executor.py

+ 8 - 94
aphrodite/executor/ray_gpu_executor.py

@@ -3,17 +3,15 @@ import os
 import pickle
 from collections import defaultdict
 from itertools import islice, repeat
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
-
-from loguru import logger
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
 
 from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
 from aphrodite.common.utils import (get_aphrodite_instance_id,
                                     get_distributed_init_method, get_ip,
                                     get_open_port, make_async)
+from aphrodite.executor.distributed_gpu_executor import (
+    DistributedGPUExecutor, DistributedGPUExecutorAsync)
 from aphrodite.executor.ray_utils import RayWorkerWrapper, ray
-from aphrodite.executor.executor_base import ExecutorAsyncBase, ExecutorBase
-from aphrodite.lora.request import LoRARequest
 
 if ray is not None:
     from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
@@ -27,7 +25,7 @@ if TYPE_CHECKING:
 USE_RAY_COMPILED_DAG = bool(os.getenv("APHRODITE_USE_RAY_COMPILED_DAG", 0))
 
 
-class RayGPUExecutor(ExecutorBase):
+class RayGPUExecutor(DistributedGPUExecutor):
 
     def _init_executor(self) -> None:
         assert (not self.speculative_config
@@ -179,54 +177,9 @@ class RayGPUExecutor(ExecutorBase):
         self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)
 
         self._run_workers("init_device")
-        self._run_workers(
-            "load_model",
-            max_concurrent_workers=self.parallel_config.
-            max_parallel_loading_workers,
-        )
-
-    def determine_num_available_blocks(self) -> Tuple[int, int]:
-        """Determine the number of available KV blocks.
-
-        This invokes `determine_num_available_blocks` on each worker and takes
-        the min of the results, guaranteeing that the selected cache sizes are
-        compatible with all workers.
-
-        Returns:
-            - Tuple[num_gpu_blocks, num_cpu_blocks]
-        """
-        # Get the maximum number of blocks that can be allocated on GPU and CPU.
-        num_blocks = self._run_workers("determine_num_available_blocks", )
-
-        # Since we use a shared centralized controller, we take the minimum
-        # number of blocks across all workers to make sure all the memory
-        # operators can be applied to all workers.
-        num_gpu_blocks = min(b[0] for b in num_blocks)
-        num_cpu_blocks = min(b[1] for b in num_blocks)
-
-        return num_gpu_blocks, num_cpu_blocks
-
-    def initialize_cache(self, num_gpu_blocks: int,
-                         num_cpu_blocks: int) -> None:
-        """Initialize the KV cache in all workers.
-        """
-
-        # NOTE: We log here to avoid multiple logs when number of workers is
-        # greater than one. We could log in the engine, but not all executors
-        # have GPUs.
-        logger.info(f"# GPU blocks: {num_gpu_blocks}, "
-                    f"# CPU blocks: {num_cpu_blocks}")
-
-        logger.info(
-            f"Minimum concurrency: {num_gpu_blocks * self.cache_config.block_size / self.scheduler_config.max_model_len:.2f}x"  # noqa: E501
-        )
-
-        self.cache_config.num_gpu_blocks = num_gpu_blocks
-        self.cache_config.num_cpu_blocks = num_cpu_blocks
-
-        self._run_workers("initialize_cache",
-                          num_gpu_blocks=num_gpu_blocks,
-                          num_cpu_blocks=num_cpu_blocks)
+        self._run_workers("load_model",
+                          max_concurrent_workers=self.parallel_config.
+                          max_parallel_loading_workers)
 
     def execute_model(self,
                       seq_group_metadata_list: List[SequenceGroupMetadata],
@@ -248,23 +201,6 @@ class RayGPUExecutor(ExecutorBase):
         output = all_outputs[0]
         return output
 
-    def add_lora(self, lora_request: LoRARequest) -> bool:
-        assert lora_request.lora_int_id > 0, "lora_id must be greater than 0."
-        return self._run_workers(
-            "add_lora",
-            lora_request=lora_request,
-        )
-
-    def remove_lora(self, lora_id: int) -> bool:
-        assert lora_id > 0, "lora_id must be greater than 0."
-        return self._run_workers(
-            "remove_lora",
-            lora_id=lora_id,
-        )
-
-    def list_loras(self) -> Set[int]:
-        return self._run_workers("list_loras")
-
     def _run_workers(
         self,
         method: str,
@@ -380,7 +316,7 @@ class RayGPUExecutor(ExecutorBase):
                                f"Dead Workers: {dead_actors}. ")
 
 
-class RayGPUExecutorAsync(RayGPUExecutor, ExecutorAsyncBase):
+class RayGPUExecutorAsync(RayGPUExecutor, DistributedGPUExecutorAsync):
 
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
@@ -411,25 +347,3 @@ class RayGPUExecutorAsync(RayGPUExecutor, ExecutorAsyncBase):
 
         all_outputs = await asyncio.gather(*coros)
         return all_outputs
-
-    async def execute_model_async(
-        self,
-        seq_group_metadata_list: List[SequenceGroupMetadata],
-        blocks_to_swap_in: Dict[int, int],
-        blocks_to_swap_out: Dict[int, int],
-        blocks_to_copy: Dict[int, List[int]],
-        num_lookahead_slots: int,
-    ) -> SamplerOutput:
-        all_outputs = await self._run_workers_async(
-            "execute_model",
-            driver_kwargs={
-                "seq_group_metadata_list": seq_group_metadata_list,
-                "blocks_to_swap_in": blocks_to_swap_in,
-                "blocks_to_swap_out": blocks_to_swap_out,
-                "blocks_to_copy": blocks_to_copy,
-                "num_lookahead_slots": num_lookahead_slots,
-            })
-
-        # Only the driver worker returns the sampling results.
-        output = all_outputs[0]
-        return output