tpu_communicator.py 965 B

12345678910111213141516171819202122232425262728293031
  1. import torch
  2. import torch.distributed as dist
  3. from torch.distributed import ProcessGroup
  4. from aphrodite.platforms import current_platform
  5. if current_platform.is_tpu():
  6. import torch_xla.core.xla_model as xm
  7. import torch_xla.runtime as xr
  8. from torch_xla._internal import pjrt
  9. class TpuCommunicator:
  10. def __init__(self, group: ProcessGroup):
  11. if not current_platform.is_tpu():
  12. self.disabled = True
  13. return
  14. self.disabled = False
  15. local_rank = dist.get_rank(group)
  16. world_size = dist.get_world_size(group)
  17. pjrt.initialize_multiprocess(local_rank, world_size)
  18. xr._init_world_size_ordinal()
  19. def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
  20. return xm.all_reduce(xm.REDUCE_SUM, x)
  21. def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
  22. assert dim == -1, "TPUs only support dim=-1 for all-gather."
  23. return xm.all_gather(x, dim=dim)