123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import contextlib
- from typing import Optional
- import torch
- from loguru import logger
- from torch.distributed import ProcessGroup, ReduceOp
- try:
- from aphrodite.distributed.device_communicators.pynccl import (
- NCCLCommunicator, ncclGetVersion)
- except Exception as e:
- # in non-NVIDIA environments, we can't import the nccl module
- # e.g. when running on machines with AMD GPUs
- logger.info(f"Failed to import NCCL library: {e}")
- logger.info("It is expected if you are not running on NVIDIA GPUs.")
- pass
- comm: Optional["NCCLCommunicator"] = None
- def is_initialized() -> bool:
- """Returns whether the NCCL backend is initialized."""
- return comm is not None
- @contextlib.contextmanager
- def set_pynccl_stream(stream: torch.cuda.Stream):
- """Set the cuda stream for communication"""
- try:
- comm.stream = stream
- yield
- finally:
- pass
- def init_process_group(group: Optional[ProcessGroup] = None) -> None:
- assert not is_initialized()
- global comm
- logger.info(f"Aphrodite is using nccl=={ncclGetVersion()}")
- comm = NCCLCommunicator(group=group)
- def all_reduce(input_: torch.Tensor, op=ReduceOp.SUM) -> None:
- """All-reduces the input tensor across the process group."""
- assert input_.is_cuda, f"{input_} should be a cuda tensor"
- comm.all_reduce(input_, op)
- def destroy_process_group() -> None:
- global comm
- comm = None
- def get_world_size() -> int:
- """Returns the world size."""
- return comm.world_size
- def get_nccl_backend():
- return comm
|