123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- # Copyright 2023 The PygmalionAI team.
- # Copyright 2023 The vLLM team.
- # Adapted from
- # https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/parallel_state.py
- # Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
- """Tensor and pipeline parallel groups."""
- import os
- from typing import List, Optional
- import torch
- from loguru import logger
- from torch.distributed import ProcessGroup
- _ENABLE_CUSTOM_ALL_REDUCE = True
- # Tensor model parallel group that the current rank belongs to.
- _TP_DEVICE_GROUP: Optional[ProcessGroup] = None
- _TP_CPU_GROUP: Optional[ProcessGroup] = None
- _TP_PYNCCL_COMMUNICATOR = None
- _TP_CA_COMMUNICATOR = None
- # Pipeline model parallel group that the current rank belongs to.
- _PP_DEVICE_GROUP: Optional[ProcessGroup] = None
- _PP_CPU_GROUP: Optional[ProcessGroup] = None
- _PP_PYNCCL_COMMUNICATOR = None
- # when people blindly call `torch.distributed.all_reduce` etc,
- # it will use this group. It is initialized with the `backend`
- # parameter of `init_distributed_environment` below.
- # Essentially, this is `torch.distributed.group.WORLD`.
- # We leave a line here to note that this is device-specific.
- # Note that this variable is not safe to use, because when users
- # call `init_distributed_environment` first, and then destroy
- # the process group themselves, this variable will keep a reference to the
- # destroyed process group, which is not useful.
- _DEVICE_WORLD_GROUP = None
- # duing `init_distributed_environment`, we will also initialize a
- # group with `gloo` backend, to allow direct coordination between
- # processes through the CPU.
- _CPU_WORLD_GROUP = None
- # In summary, after calling `init_distributed_environment`, we will
- # always have two groups: one for device-specific (and is the default)
- # and one for CPU. All processes will be part of both groups.
- # A list of global ranks for each pipeline group to ease calculation of the
- # source rank when broadcasting from the first or last pipeline stage.
- _PP_GLOBAL_RANKS: Optional[List[int]] = None
- _LOCAL_RANK = -1
- def set_custom_all_reduce(enable: bool):
- global _ENABLE_CUSTOM_ALL_REDUCE
- _ENABLE_CUSTOM_ALL_REDUCE = enable
- def get_pp_pynccl_communicator():
- global _PP_PYNCCL_COMMUNICATOR
- return _PP_PYNCCL_COMMUNICATOR
- def get_tp_pynccl_communicator():
- global _TP_PYNCCL_COMMUNICATOR
- return _TP_PYNCCL_COMMUNICATOR
- def get_tp_ca_communicator():
- global _TP_CA_COMMUNICATOR
- return _TP_CA_COMMUNICATOR
- def get_local_rank():
- global _LOCAL_RANK
- return _LOCAL_RANK
- def init_distributed_environment(
- world_size: int = -1,
- rank: int = -1,
- distributed_init_method: str = "env://",
- local_rank: int = -1,
- backend: str = "nccl",
- ):
- logger.debug(f"{world_size=} {rank=} {local_rank=} "
- f"{distributed_init_method=} {backend=}")
- if not torch.distributed.is_initialized():
- assert distributed_init_method is not None, (
- "distributed_init_method must be provided when initializing "
- "distributed environment")
- # this backend is used for WORLD
- torch.distributed.init_process_group(
- backend=backend,
- init_method=distributed_init_method,
- world_size=world_size,
- rank=rank)
- global _DEVICE_WORLD_GROUP, _CPU_WORLD_GROUP
- _DEVICE_WORLD_GROUP = torch.distributed.group.WORLD
- ranks = list(range(torch.distributed.get_world_size()))
- _CPU_WORLD_GROUP = torch.distributed.new_group(ranks=ranks,
- backend="gloo")
- if local_rank == -1:
- # local rank not set, this usually happens in single-node
- # setting, where we can use rank as local rank
- if distributed_init_method == "env://":
- local_rank = int(os.environ.get("LOCAL_RANK", rank))
- else:
- local_rank = rank
- global _LOCAL_RANK
- _LOCAL_RANK = local_rank
- # A small all_reduce for warmup.
- data = torch.zeros(1)
- if torch.cuda.is_available():
- data = data.to(device=f"cuda:{local_rank}")
- torch.distributed.all_reduce(data)
- if torch.cuda.is_available():
- torch.cuda.synchronize()
- del data
- def initialize_model_parallel(
- tensor_model_parallel_size: int = 1,
- pipeline_model_parallel_size: int = 1,
- backend: Optional[str] = None,
- ) -> None:
- """
- Initialize model parallel groups.
- Arguments:
- tensor_model_parallel_size: number of GPUs used for tensor model
- parallelism.
- pipeline_model_parallel_size: number of GPUs used for pipeline model
- parallelism.
- Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
- use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
- the model pipeline. The present function will
- create 4 tensor model-parallel groups and 2 pipeline model-parallel groups:
- 4 tensor model-parallel groups:
- [g0, g1], [g2, g3], [g4, g5], [g6, g7]
- 2 pipeline model-parallel groups:
- [g0, g2, g4, g6], [g1, g3, g5, g7]
- Note that for efficiency, the caller should make sure adjacent ranks
- are on the same DGX box. For example if we are using 2 DGX-1 boxes
- with a total of 16 GPUs, rank 0 to 7 belong to the first box and
- ranks 8 to 15 belong to the second box.
- """
- # Get world size and rank. Ensure some consistencies.
- assert torch.distributed.is_initialized()
- world_size: int = torch.distributed.get_world_size()
- # get the backend of _DEVICE_WORLD_GROUP
- backend = backend or torch.distributed.get_backend()
- if (world_size !=
- tensor_model_parallel_size * pipeline_model_parallel_size):
- raise RuntimeError(
- f"world_size ({world_size}) is not equal to "
- f"tensor_model_parallel_size ({tensor_model_parallel_size}) x "
- f"pipeline_model_parallel_size ({pipeline_model_parallel_size})")
- num_tensor_model_parallel_groups: int = (world_size //
- tensor_model_parallel_size)
- num_pipeline_model_parallel_groups: int = (world_size //
- pipeline_model_parallel_size)
- rank = torch.distributed.get_rank()
- # Build the tensor model-parallel groups.
- global _TP_DEVICE_GROUP, _TP_CPU_GROUP
- global _TP_PYNCCL_COMMUNICATOR, _TP_CA_COMMUNICATOR
- assert _TP_DEVICE_GROUP is None, (
- "tensor model parallel group is already initialized")
- for i in range(num_tensor_model_parallel_groups):
- ranks = list(
- range(i * tensor_model_parallel_size,
- (i + 1) * tensor_model_parallel_size))
- group = torch.distributed.new_group(ranks, backend=backend)
- cpu_group = torch.distributed.new_group(ranks, backend="gloo")
- if rank in ranks:
- _TP_DEVICE_GROUP = group
- _TP_CPU_GROUP = cpu_group
- from aphrodite.distributed.device_communicators.pynccl import \
- PyNcclCommunicator
- if tensor_model_parallel_size > 1:
- _TP_PYNCCL_COMMUNICATOR = PyNcclCommunicator(
- group=_TP_CPU_GROUP,
- device=_LOCAL_RANK,
- )
- # Initialize a custom fast all-reduce implementation.
- if _ENABLE_CUSTOM_ALL_REDUCE:
- from aphrodite.distributed.device_communicators.custom_all_reduce \
- import CustomAllreduce
- _TP_CA_COMMUNICATOR = CustomAllreduce(
- group=_TP_CPU_GROUP,
- device=_LOCAL_RANK,
- )
- # Build the pipeline model-parallel groups.
- global _PP_DEVICE_GROUP, _PP_CPU_GROUP
- global _PP_PYNCCL_COMMUNICATOR
- global _PP_GLOBAL_RANKS
- assert _PP_DEVICE_GROUP is None, (
- "pipeline model parallel group is already initialized")
- for i in range(num_pipeline_model_parallel_groups):
- ranks = list(range(i, world_size, num_pipeline_model_parallel_groups))
- group = torch.distributed.new_group(ranks, backend=backend)
- cpu_group = torch.distributed.new_group(ranks, backend="gloo")
- if rank in ranks:
- _PP_DEVICE_GROUP = group
- _PP_CPU_GROUP = cpu_group
- _PP_GLOBAL_RANKS = ranks
- if pipeline_model_parallel_size > 1:
- _PP_PYNCCL_COMMUNICATOR = PyNcclCommunicator(
- group=_PP_CPU_GROUP,
- device=_LOCAL_RANK,
- )
- def ensure_model_parallel_initialized(
- tensor_model_parallel_size: int,
- pipeline_model_parallel_size: int,
- backend: Optional[str] = None,
- ) -> None:
- """Helper to initialize model parallel groups if they are not initialized,
- or ensure tensor-parallel and pipeline-parallel sizes are equal to expected
- values if the model parallel groups are initialized.
- """
- # get the backend of _DEVICE_WORLD_GROUP
- backend = backend or torch.distributed.get_backend()
- if not model_parallel_is_initialized():
- initialize_model_parallel(tensor_model_parallel_size,
- pipeline_model_parallel_size, backend)
- return
- assert (
- get_tensor_model_parallel_world_size() == tensor_model_parallel_size
- ), ("tensor parallel group already initialized, but of unexpected size: "
- f"{get_tensor_model_parallel_world_size()=} vs. "
- f"{tensor_model_parallel_size=}")
- assert (get_pipeline_model_parallel_world_size(
- ) == pipeline_model_parallel_size), (
- "pipeline parallel group already initialized, but of unexpected size: "
- f"{get_pipeline_model_parallel_world_size()=} vs. "
- f"{pipeline_model_parallel_size=}")
- def model_parallel_is_initialized():
- """Check if tensor and pipeline parallel groups are initialized."""
- return (_TP_DEVICE_GROUP is not None and _PP_DEVICE_GROUP is not None)
- def get_cpu_world_group():
- """Get the CPU world group."""
- assert _CPU_WORLD_GROUP is not None, ("CPU world group is not initialized")
- return _CPU_WORLD_GROUP
- def get_tensor_model_parallel_group():
- """Get the tensor model parallel group the caller rank belongs to."""
- assert _TP_DEVICE_GROUP is not None, (
- "tensor model parallel group is not initialized")
- return _TP_DEVICE_GROUP
- def get_tensor_model_parallel_cpu_group():
- """Get the tensor model parallel cpu group the caller rank belongs to."""
- assert _TP_CPU_GROUP is not None, (
- "tensor model parallel cpu group is not initialized")
- return _TP_CPU_GROUP
- def get_pipeline_model_parallel_group():
- """Get the pipeline model parallel group the caller rank belongs to."""
- assert _PP_DEVICE_GROUP is not None, (
- "pipeline model parallel group is not initialized")
- return _PP_DEVICE_GROUP
- def get_pipeline_model_parallel_cpu_group():
- """Get the pipeline model parallel cpu group the caller rank belongs to."""
- assert _PP_CPU_GROUP is not None, (
- "pipeline model parallel cpu group is not initialized")
- return _PP_CPU_GROUP
- def get_tensor_model_parallel_world_size():
- """Return world size for the tensor model parallel group."""
- return torch.distributed.get_world_size(
- group=get_tensor_model_parallel_group())
- def get_pipeline_model_parallel_world_size():
- """Return world size for the pipeline model parallel group."""
- return torch.distributed.get_world_size(
- group=get_pipeline_model_parallel_group())
- def get_tensor_model_parallel_rank():
- """Return my rank for the tensor model parallel group."""
- return torch.distributed.get_rank(group=get_tensor_model_parallel_group())
- def get_pipeline_model_parallel_rank():
- """Return my rank for the pipeline model parallel group."""
- return torch.distributed.get_rank(
- group=get_pipeline_model_parallel_group())
- def get_tensor_model_parallel_src_rank():
- """Calculate the global rank corresponding to the first local rank
- in the tensor model parallel group."""
- global_rank = torch.distributed.get_rank()
- local_world_size = get_tensor_model_parallel_world_size()
- return (global_rank // local_world_size) * local_world_size
- def get_pipeline_model_parallel_first_rank():
- """Return the global rank of the first process in the pipeline for the
- current tensor parallel group"""
- assert _PP_GLOBAL_RANKS is not None, (
- "Pipeline parallel group is not initialized")
- return _PP_GLOBAL_RANKS[0]
- def get_pipeline_model_parallel_last_rank():
- """Return the global rank of the last process in the pipeline for the
- current tensor parallel group"""
- assert _PP_GLOBAL_RANKS is not None, (
- "Pipeline parallel group is not initialized")
- last_rank_local = get_pipeline_model_parallel_world_size() - 1
- return _PP_GLOBAL_RANKS[last_rank_local]
- def get_pipeline_model_parallel_next_rank():
- """Return the global rank that follows the caller in the pipeline"""
- assert _PP_GLOBAL_RANKS is not None, (
- "Pipeline parallel group is not initialized")
- rank_in_pipeline = get_pipeline_model_parallel_rank()
- world_size = get_pipeline_model_parallel_world_size()
- return _PP_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]
- def get_pipeline_model_parallel_prev_rank():
- """Return the global rank that precedes the caller in the pipeline"""
- assert _PP_GLOBAL_RANKS is not None, (
- "Pipeline parallel group is not initialized")
- rank_in_pipeline = get_pipeline_model_parallel_rank()
- world_size = get_pipeline_model_parallel_world_size()
- return _PP_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]
- def destroy_model_parallel():
- """Set the groups to none and destroy them."""
- global _TP_DEVICE_GROUP
- if _TP_DEVICE_GROUP:
- torch.distributed.destroy_process_group(_TP_DEVICE_GROUP)
- _TP_DEVICE_GROUP = None
- global _TP_CPU_GROUP
- if _TP_CPU_GROUP:
- torch.distributed.destroy_process_group(_TP_CPU_GROUP)
- _TP_CPU_GROUP = None
- global _TP_PYNCCL_COMMUNICATOR
- _TP_PYNCCL_COMMUNICATOR = None
- global _PP_DEVICE_GROUP
- if _PP_DEVICE_GROUP:
- torch.distributed.destroy_process_group(_PP_DEVICE_GROUP)
- _PP_DEVICE_GROUP = None
- global _PP_GLOBAL_RANKS
- _PP_GLOBAL_RANKS = None
|