tpu_communicator.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import torch
  2. import torch.distributed as dist
  3. from torch.distributed import ProcessGroup
  4. from aphrodite.platforms import current_platform
  5. if current_platform.is_tpu():
  6. import ray
  7. import torch_xla.core.xla_model as xm
  8. import torch_xla.runtime as xr
  9. from torch_xla._internal import pjrt
  10. class TpuCommunicator:
  11. def __init__(self, group: ProcessGroup):
  12. if not current_platform.is_tpu():
  13. self.disabled = True
  14. return
  15. self.disabled = False
  16. # NOTE: When using TP > 1 on TPUs, every TPU on the same node
  17. # must be used together. Therefore, the local rank and world
  18. # size can be simply calculated as follows.
  19. global_rank = dist.get_rank(group)
  20. global_world_size = dist.get_world_size(group)
  21. num_nodes = len(ray.nodes())
  22. local_world_size = global_world_size // num_nodes
  23. local_rank = global_rank % local_world_size
  24. pjrt.initialize_multiprocess(local_rank, local_world_size)
  25. xr._init_world_size_ordinal()
  26. def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
  27. return xm.all_reduce(xm.REDUCE_SUM, x)
  28. def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
  29. assert dim == -1, "TPUs only support dim=-1 for all-gather."
  30. return xm.all_gather(x, dim=dim)