tpu_communicator.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import os
  2. import torch
  3. import torch.distributed as dist
  4. from torch.distributed import ProcessGroup
  5. from aphrodite.platforms import current_platform
  6. if current_platform.is_tpu():
  7. import torch_xla.core.xla_model as xm
  8. import torch_xla.runtime as xr
  9. from torch_xla._internal import pjrt
  10. from aphrodite.executor import ray_utils
  11. class TpuCommunicator:
  12. def __init__(self, group: ProcessGroup):
  13. if not current_platform.is_tpu():
  14. self.disabled = True
  15. return
  16. self.disabled = False
  17. # NOTE: When using TP > 1 on TPUs, every TPU on the same node
  18. # must be used together. Therefore, the local rank and world
  19. # size can be simply calculated as follows.
  20. global_rank = dist.get_rank(group)
  21. global_world_size = dist.get_world_size(group)
  22. # Calculate how many TPU nodes are in the current deployment. This
  23. # is the Ray placement group if it is deployed with Ray. Default
  24. # to the number of TPU nodes in the Ray cluster. The number of TPU
  25. # nodes is computed by the total number of TPUs divided by the
  26. # number of TPU accelerators per node, to account for clusters
  27. # with both CPUs and TPUs.
  28. num_nodes = ray_utils.get_num_tpu_nodes()
  29. num_nodes_in_pg = ray_utils.get_num_nodes_in_placement_group()
  30. if num_nodes_in_pg > 0:
  31. num_nodes = num_nodes_in_pg
  32. local_world_size = global_world_size // num_nodes
  33. local_rank = global_rank % local_world_size
  34. # Ensure environment variables are set for multihost deployments.
  35. # On GKE, this is needed for libtpu and TPU driver to know which TPU
  36. # chip is actually visible. Otherwise the TPU driver will fail to
  37. # initialize because the number of devices would be different from
  38. # the number of visible worker addresses.
  39. os.environ["CLOUD_TPU_TASK_ID"] = str(global_rank)
  40. os.environ["TPU_VISIBLE_CHIPS"] = str(local_rank)
  41. pjrt.initialize_multiprocess(local_rank, local_world_size)
  42. xr._init_world_size_ordinal()
  43. def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
  44. return xm.all_reduce(xm.REDUCE_SUM, x)
  45. def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
  46. assert dim == -1, "TPUs only support dim=-1 for all-gather."
  47. return xm.all_gather(x, dim=dim)