12345678910111213141516171819202122232425262728293031 |
- import torch
- import torch.distributed as dist
- from torch.distributed import ProcessGroup
- from aphrodite.platforms import current_platform
- if current_platform.is_tpu():
- import torch_xla.core.xla_model as xm
- import torch_xla.runtime as xr
- from torch_xla._internal import pjrt
- class TpuCommunicator:
- def __init__(self, group: ProcessGroup):
- if not current_platform.is_tpu():
- self.disabled = True
- return
- self.disabled = False
- local_rank = dist.get_rank(group)
- world_size = dist.get_world_size(group)
- pjrt.initialize_multiprocess(local_rank, world_size)
- xr._init_world_size_ordinal()
- def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
- return xm.all_reduce(xm.REDUCE_SUM, x)
- def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
- assert dim == -1, "TPUs only support dim=-1 for all-gather."
- return xm.all_gather(x, dim=dim)
|