Browse Source

support pipeline parallel pynccl groups

AlpinDale 7 months ago
parent
commit
5b0c11d190

+ 12 - 6
aphrodite/distributed/communication_op.py

@@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union
 import torch
 from torch.distributed import ProcessGroup
 
-from .parallel_state import (get_cpu_world_group,
+from .parallel_state import (get_cpu_world_group, get_pp_pynccl_communicator,
                              get_tensor_model_parallel_group,
                              get_tensor_model_parallel_rank,
                              get_tensor_model_parallel_world_size,
@@ -54,13 +54,19 @@ def graph_capture():
         # graph, we use either custom all-reduce kernel or PyTorch NCCL.
         # We always prioritize using custom all-reduce kernel but fall back
         # to PyTorch or pynccl if it is disabled or not supported.
-        pynccl_comm = get_tp_pynccl_communicator()
-        if pynccl_comm is None:
-            maybe_pynccl_context = nullcontext()
+        tp_pynccl_comm = get_tp_pynccl_communicator()
+        pp_pynccl_comm = get_pp_pynccl_communicator()
+        if not tp_pynccl_comm:
+            maybe_tp_pynccl_context = nullcontext()
         else:
-            maybe_pynccl_context = pynccl_comm.change_state(
+            maybe_tp_pynccl_context = tp_pynccl_comm.change_state(
                 enable=True, stream=torch.cuda.current_stream())
-        with maybe_pynccl_context:
+        if not pp_pynccl_comm:
+            maybe_pp_pynccl_context = nullcontext()
+        else:
+            maybe_pp_pynccl_context = pp_pynccl_comm.change_state(
+                enable=True, stream=torch.cuda.current_stream())
+        with maybe_tp_pynccl_context, maybe_pp_pynccl_context:
             yield graph_capture_context
 
 

+ 34 - 0
aphrodite/distributed/device_communicators/pynccl.py

@@ -125,6 +125,40 @@ class PyNcclCommunicator:
                                 ncclRedOpTypeEnum.from_torch(op), self.comm,
                                 cudaStream_t(stream.cuda_stream))
 
+    def send(self,
+             tensor: torch.Tensor,
+             dst: Optional[int] = None,
+             stream=None):
+        if self.disabled:
+            return
+        assert tensor.device == self.device, (
+            f"this nccl communicator is created to work on {self.device}, "
+            f"but the input tensor is on {tensor.device}")
+        if stream is None:
+            stream = self.stream
+        if dst is None:
+            dst = (self.rank + 1) % self.world_size
+        self.nccl.ncclSend(buffer_type(tensor.data_ptr()), tensor.numel(),
+                           ncclDataTypeEnum.from_torch(tensor.dtype), dst,
+                           self.comm, cudaStream_t(stream.cuda_stream))
+
+    def recv(self,
+             tensor: torch.Tensor,
+             src: Optional[int] = None,
+             stream=None):
+        if self.disabled:
+            return
+        assert tensor.device == self.device, (
+            f"this nccl communicator is created to work on {self.device}, "
+            f"but the input tensor is on {tensor.device}")
+        if stream is None:
+            stream = self.stream
+        if src is None:
+            src = (self.rank - 1) % self.world_size
+        self.nccl.ncclRecv(buffer_type(tensor.data_ptr()), tensor.numel(),
+                           ncclDataTypeEnum.from_torch(tensor.dtype), src,
+                           self.comm, cudaStream_t(stream.cuda_stream))
+
     @contextmanager
     def change_state(self,
                      enable: Optional[bool] = None,

+ 26 - 0
aphrodite/distributed/device_communicators/pynccl_wrapper.py

@@ -149,6 +149,22 @@ class NCCLLibrary:
             ncclRedOp_t, ncclComm_t, cudaStream_t
         ]),
 
+        # ncclResult_t  ncclSend(
+        #   const void* sendbuff, size_t count, ncclDataType_t datatype,
+        #   int dest, ncclComm_t comm, cudaStream_t stream);
+        Function("ncclSend", ncclResult_t, [
+            buffer_type, ctypes.c_size_t, ncclDataType_t, ctypes.c_int,
+            ncclComm_t, cudaStream_t
+        ]),
+
+        # ncclResult_t  ncclRecv(
+        #   void* recvbuff, size_t count, ncclDataType_t datatype,
+        #   int src, ncclComm_t comm, cudaStream_t stream);
+        Function("ncclRecv", ncclResult_t, [
+            buffer_type, ctypes.c_size_t, ncclDataType_t, ctypes.c_int,
+            ncclComm_t, cudaStream_t
+        ]),
+
         # be cautious! this is a collective call, it will block until all
         # processes in the communicator have called this function.
         # because Python object destruction can happen in random order,
@@ -247,6 +263,16 @@ class NCCLLibrary:
                                                      datatype, op, comm,
                                                      stream))
 
+    def ncclSend(self, sendbuff: buffer_type, count: int, datatype: int,
+                 dest: int, comm: ncclComm_t, stream: cudaStream_t) -> None:
+        self.NCCL_CHECK(self._funcs["ncclSend"](sendbuff, count, datatype,
+                                                dest, comm, stream))
+
+    def ncclRecv(self, recvbuff: buffer_type, count: int, datatype: int,
+                 src: int, comm: ncclComm_t, stream: cudaStream_t) -> None:
+        self.NCCL_CHECK(self._funcs["ncclRecv"](recvbuff, count, datatype, src,
+                                                comm, stream))
+
     def ncclCommDestroy(self, comm: ncclComm_t) -> None:
         self.NCCL_CHECK(self._funcs["ncclCommDestroy"](comm))
 

+ 29 - 5
aphrodite/distributed/parallel_state.py

@@ -19,6 +19,8 @@ _TP_PYNCCL_COMMUNICATOR = None
 _TP_CA_COMMUNICATOR = None
 # Pipeline model parallel group that the current rank belongs to.
 _PP_DEVICE_GROUP: Optional[ProcessGroup] = None
+_PP_CPU_GROUP: Optional[ProcessGroup] = None
+_PP_PYNCCL_COMMUNICATOR = None
 
 # when people blindly call `torch.distributed.all_reduce` etc,
 # it will use this group. It is initialized with the `backend`
@@ -52,6 +54,11 @@ def set_custom_all_reduce(enable: bool):
     _ENABLE_CUSTOM_ALL_REDUCE = enable
 
 
+def get_pp_pynccl_communicator():
+    global _PP_PYNCCL_COMMUNICATOR
+    return _PP_PYNCCL_COMMUNICATOR
+
+
 def get_tp_pynccl_communicator():
     global _TP_PYNCCL_COMMUNICATOR
     return _TP_PYNCCL_COMMUNICATOR
@@ -173,10 +180,11 @@ def initialize_model_parallel(
 
     from aphrodite.distributed.device_communicators.pynccl import \
         PyNcclCommunicator
-    _TP_PYNCCL_COMMUNICATOR = PyNcclCommunicator(
-        group=_TP_CPU_GROUP,
-        device=_LOCAL_RANK,
-    )
+    if tensor_model_parallel_size > 1:
+        _TP_PYNCCL_COMMUNICATOR = PyNcclCommunicator(
+            group=_TP_CPU_GROUP,
+            device=_LOCAL_RANK,
+        )
 
     # Initialize a custom fast all-reduce implementation.
     if _ENABLE_CUSTOM_ALL_REDUCE:
@@ -188,17 +196,26 @@ def initialize_model_parallel(
         )
 
     # Build the pipeline model-parallel groups.
-    global _PP_DEVICE_GROUP
+    global _PP_DEVICE_GROUP, _PP_CPU_GROUP
+    global _PP_PYNCCL_COMMUNICATOR
     global _PP_GLOBAL_RANKS
     assert _PP_DEVICE_GROUP is None, (
         "pipeline model parallel group is already initialized")
     for i in range(num_pipeline_model_parallel_groups):
         ranks = list(range(i, world_size, num_pipeline_model_parallel_groups))
         group = torch.distributed.new_group(ranks, backend=backend)
+        cpu_group = torch.distributed.new_group(ranks, backend="gloo")
         if rank in ranks:
             _PP_DEVICE_GROUP = group
+            _PP_CPU_GROUP = cpu_group
             _PP_GLOBAL_RANKS = ranks
 
+    if pipeline_model_parallel_size > 1:
+        _PP_PYNCCL_COMMUNICATOR = PyNcclCommunicator(
+            group=_PP_CPU_GROUP,
+            device=_LOCAL_RANK,
+        )
+
 
 def ensure_model_parallel_initialized(
     tensor_model_parallel_size: int,
@@ -260,6 +277,13 @@ def get_pipeline_model_parallel_group():
     return _PP_DEVICE_GROUP
 
 
+def get_pipeline_model_parallel_cpu_group():
+    """Get the pipeline model parallel cpu group the caller rank belongs to."""
+    assert _PP_CPU_GROUP is not None, (
+        "pipeline model parallel cpu group is not initialized")
+    return _PP_CPU_GROUP
+
+
 def get_tensor_model_parallel_world_size():
     """Return world size for the tensor model parallel group."""
     return torch.distributed.get_world_size(

+ 4 - 4
aphrodite/engine/output_processor/multi_step.py

@@ -19,10 +19,10 @@ from aphrodite.transformers_utils.detokenizer import Detokenizer
 class MultiStepOutputProcessor(SequenceGroupOutputProcessor):
     """SequenceGroupOutputProcessor which handles logic related to
     detokenization and stopping conditions. It specializes to "multi-step
-    decoding", where vLLM's worker may generate multiple tokens per invocation.
-    This is currently mutually exclusive with advanced sampling techniques like
-    beam search, which motivates the separation of this logic from the single
-    step output processor.
+    decoding", where Aphrodite's worker may generate multiple tokens per
+    invocation. This is currently mutually exclusive with advanced sampling
+    techniques like beam search, which motivates the separation of this logic
+    from the single step output processor.
 
     This class is responsible for things such as correctly appending all new
     token ids to their sequence, detokenizing new token ids, truncating new

+ 1 - 1
aphrodite/executor/multiproc_gpu_executor.py

@@ -27,7 +27,7 @@ class MultiprocessingGPUExecutor(DistributedGPUExecutor):
             os.environ["CUDA_VISIBLE_DEVICES"] = (",".join(
                 map(str, range(world_size))))
 
-        # Ensure that VLLM_INSTANCE_ID is set, to be inherited by workers
+        # Ensure that APHRODITE_INSTANCE_ID is set, to be inherited by workers
         os.environ["APHRODITE_INSTANCE_ID"] = get_aphrodite_instance_id()
 
         from torch.cuda import device_count

+ 1 - 1
aphrodite/lora/request.py

@@ -13,7 +13,7 @@ class LoRARequest:
     accessing unauthorized LoRA adapters.
 
     lora_int_id must be globally unique for a given adapter.
-    This is currently not enforced in vLLM.
+    This is currently not enforced in Aphrodite.
     """
 
     lora_name: str

+ 1 - 1
aphrodite/modeling/guided_decoding/outlines_logits_processors.py

@@ -170,7 +170,7 @@ def _adapt_tokenizer(tokenizer: PreTrainedTokenizerBase):
     def change_decoder(
         decoder: Callable[[List[int]],
                           str]) -> Callable[[List[int]], List[str]]:
-        """Sync vLLM's decoder with the outlines by returning list."""
+        """Sync Aphrodite's decoder with the outlines by returning list."""
 
         def new_decoder(inp_tokens: List[int]) -> List[str]:
             return [decoder(inp_tokens)]

+ 1 - 1
aphrodite/processing/scheduler.py

@@ -1003,7 +1003,7 @@ class Scheduler:
         # Now that the batch has been created, we can assume all blocks in the
         # batch will have been computed before the next scheduling invocation.
         # This is because the engine assumes that a failure in model execution
-        # will crash the vLLM instance / will not retry.
+        # will crash the Aphrodite instance / will not retry.
         for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups:
             self.block_manager.mark_blocks_as_computed(
                 scheduled_seq_group.seq_group)

+ 2 - 1
aphrodite/quantization/gptq_marlin.py

@@ -378,7 +378,8 @@ class GPTQMarlinLinearMethod(LinearMethodBase):
             layer.marlin_state = GPTQMarlinState.READY
 
             # Newly generated tensors need to replace existing tensors that are
-            # already registered as parameters by vLLM (and won't be freed)
+            # already registered as parameters by Aphrodite (and won't be
+            # freed)
             def replace_tensor(name, new_t):
                 # It is important to use resize_() here since it ensures
                 # the same buffer is reused

+ 8 - 8
aphrodite/spec_decode/spec_decode_worker.py

@@ -126,8 +126,8 @@ class SpecDecodeWorker(LoraNotSupportedWorkerBase):
             proposer_worker: A worker that can produce speculative tokens for
                 sequences.
             scorer_worker: A worker that produces probabilities of speculative
-                tokens according to some base model. Typically a vanilla vLLM
-                Worker.
+                tokens according to some base model. Typically a vanilla
+                Aphrodite Worker.
             rejection_sampler: A Torch module used to perform modified rejection
                 sampling for speculative decoding.
             disable_by_batch_size: If the batch size is larger than this,
@@ -187,8 +187,8 @@ class SpecDecodeWorker(LoraNotSupportedWorkerBase):
         iterations in a row without incurring the "move to CPU and serialize"
         performance penalty.
 
-        Since this requires a large change to vLLM, we defer it to later and
-        temporarily accept this broken abstraction boundary.
+        Since this requires a large change to Aphrodite, we defer it to later
+        and temporarily accept this broken abstraction boundary.
 
         NOTE: This will require a special check if the proposer worker
         does not have a sampler (e.g. ngram speculation).
@@ -403,10 +403,10 @@ class SpecDecodeWorker(LoraNotSupportedWorkerBase):
         """
         proposal_lens_list = proposals.proposal_lens.tolist()
 
-        # vLLM currently only supports proposal lens equal to zero or the batch
-        # proposal len. This adds some complexity (splitting the batch into spec
-        # and non spec sequences) and should be removed in the future. It can be
-        # done by supporting per-sequence proposal lens.
+        # Aphrodite currently only supports proposal lens equal to zero or the
+        # batch proposal len. This adds some complexity (splitting the batch
+        # into spec and non spec sequences) and should be removed in the
+        # future. It can be done by supporting per-sequence proposal lens.
         _, spec_indices = split_batch_by_proposal_len(
             seq_group_metadata_list,
             proposal_lens_list,

+ 3 - 2
aphrodite/spec_decode/util.py

@@ -79,8 +79,9 @@ def create_sequence_group_output(
         topk_token_ids (List[int]): The list of top-k token ids.
         topk_logprobs (List[float]): The list of top-k logprobs.
     """
-    # vLLM logprobs always include the sampled token. In addition, the user may
-    # request topk-logprobs (where top-k varies per user up to max_logprobs).
+    # Aphrodite logprobs always include the sampled token. In addition, the
+    # user may request topk-logprobs (where top-k varies per user up to
+    # max_logprobs).
     logprobs: Dict[int, Logprob] = {
         token_id: Logprob(
             logprob=token_id_logprob,

+ 2 - 2
aphrodite/task_handler/model_runner.py

@@ -479,7 +479,7 @@ class ModelRunner:
 
         # If cuda graph can be used, pad tensors accordingly.
         # See `capture_model` API for more details.
-        # vLLM uses cuda graph only for decoding requests.
+        # Aphrodite uses cuda graph only for decoding requests.
         use_captured_graph = (
             decode_only and not self.model_config.enforce_eager
             and batch_size <= _BATCH_SIZES_TO_CAPTURE[-1]
@@ -859,7 +859,7 @@ class ModelRunner:
         Note that CUDA graph's performance gain is negligible if number
         of batched tokens are larger than 200. And since CUDA graph
         requires fixed sized tensors, supporting large/variable batch
-        size requires high GPU memory overhead. Thus, vLLM only captures
+        size requires high GPU memory overhead. Thus, Aphrodite only captures
         decoding requests. Mixed batch (chunked prefill + decoding) or
         prefill requests are not captured.