communication_op.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. from collections import namedtuple
  2. from typing import Any, Dict, List, Optional, Union
  3. import torch
  4. from torch.distributed import ProcessGroup
  5. from .parallel_state import (
  6. get_tensor_model_parallel_group,
  7. get_tensor_model_parallel_rank,
  8. get_tensor_model_parallel_world_size,
  9. is_pynccl_enabled_for_all_reduce,
  10. )
  11. def tensor_model_parallel_all_reduce(input_: torch.Tensor) -> torch.Tensor:
  12. """All-reduce the input tensor across model parallel group.
  13. NOTE: This operation will be applied in-place on the input tensor if
  14. disable_custom_all_reduce is set to True. Otherwise, this operation may or
  15. may not be applied in place depending on whether custom all reduce is
  16. invoked for a particular tensor, which further depends on the tensor size
  17. and GPU topology.
  18. TLDR: always assume this function modifies its input, but use the return
  19. value as the output.
  20. """
  21. from aphrodite.distributed.device_communicators import pynccl_utils
  22. from aphrodite.distributed.device_communicators.custom_all_reduce import (
  23. custom_all_reduce)
  24. # Bypass the function if we are using only 1 GPU.
  25. if get_tensor_model_parallel_world_size() == 1:
  26. return input_
  27. out = custom_all_reduce(input_)
  28. if out is not None:
  29. return out
  30. if is_pynccl_enabled_for_all_reduce():
  31. # TODO: support multiple parallel groups.
  32. pynccl_utils.all_reduce(input_)
  33. else:
  34. torch.distributed.all_reduce(input_,
  35. group=get_tensor_model_parallel_group())
  36. return input_
  37. def tensor_model_parallel_all_gather(input_: torch.Tensor,
  38. dim: int = -1) -> torch.Tensor:
  39. """All-gather the input tensor across model parallel group."""
  40. world_size = get_tensor_model_parallel_world_size()
  41. # Bypass the function if we are using only 1 GPU.
  42. if world_size == 1:
  43. return input_
  44. assert -input_.dim() <= dim < input_.dim(), (
  45. f"Invalid dim ({dim}) for input tensor with shape {input_.size()}")
  46. if dim < 0:
  47. # Convert negative dim to positive.
  48. dim += input_.dim()
  49. input_size = input_.size()
  50. # Allocate output tensor.
  51. output_tensor = torch.empty((world_size, ) + input_size,
  52. dtype=input_.dtype,
  53. device=input_.device)
  54. # All-gather.
  55. torch.distributed.all_gather_into_tensor(
  56. output_tensor, input_, group=get_tensor_model_parallel_group())
  57. # Reshape
  58. output_tensor = output_tensor.movedim(0, dim)
  59. output_tensor = output_tensor.reshape(input_size[:dim] +
  60. (world_size * input_size[dim], ) +
  61. input_size[dim + 1:])
  62. return output_tensor
  63. def tensor_model_parallel_gather(input_: torch.Tensor,
  64. dst: int = 0,
  65. dim: int = -1) -> torch.Tensor:
  66. """Gather the input tensor across model parallel group.
  67. NOTE: We assume that the input tensor is on the same device across
  68. all the ranks.
  69. """
  70. world_size = get_tensor_model_parallel_world_size()
  71. # Bypass the function if we are using only 1 GPU.
  72. if world_size == 1:
  73. return input_
  74. assert -input_.dim() <= dim < input_.dim(), (
  75. f"Invalid dim ({dim}) for input tensor with shape {input_.size()}")
  76. if dim < 0:
  77. # Convert negative dim to positive.
  78. dim += input_.dim()
  79. # Allocate output tensor.
  80. if get_tensor_model_parallel_rank() == dst:
  81. gather_list = [torch.empty_like(input_) for _ in range(world_size)]
  82. else:
  83. gather_list = None
  84. # Gather.
  85. torch.distributed.gather(input_,
  86. gather_list,
  87. dst=dst,
  88. group=get_tensor_model_parallel_group())
  89. if get_tensor_model_parallel_rank() == dst:
  90. output_tensor = torch.cat(gather_list, dim=dim)
  91. else:
  92. output_tensor = None
  93. return output_tensor
  94. def broadcast(input_: torch.Tensor,
  95. src: int = 0,
  96. group: Optional[ProcessGroup] = None):
  97. """Broadcast the input tensor."""
  98. group = group or torch.distributed.group.WORLD
  99. ranks = torch.distributed.get_process_group_ranks(group)
  100. assert src in ranks, f"Invalid src rank ({src})"
  101. # Bypass the function if we are using only 1 GPU.
  102. world_size = torch.distributed.get_world_size(group=group)
  103. if world_size == 1:
  104. return input_
  105. # Broadcast.
  106. torch.distributed.broadcast(input_, src=src, group=group)
  107. return input_
  108. def broadcast_object_list(obj_list: List[Any],
  109. src: int = 0,
  110. group: Optional[ProcessGroup] = None):
  111. """Broadcast the input object list."""
  112. group = group or torch.distributed.group.WORLD
  113. ranks = torch.distributed.get_process_group_ranks(group)
  114. assert src in ranks, f"Invalid src rank ({src})"
  115. # Bypass the function if we are using only 1 GPU.
  116. world_size = torch.distributed.get_world_size(group=group)
  117. if world_size == 1:
  118. return obj_list
  119. # Broadcast.
  120. torch.distributed.broadcast_object_list(obj_list, src=src, group=group)
  121. return obj_list
  122. TensorMetadata = namedtuple("TensorMetadata", ["dtype", "size"])
  123. def broadcast_tensor_dict(
  124. tensor_dict: Optional[Dict[Any, Union[torch.Tensor, Any]]] = None,
  125. src: int = 0,
  126. group: Optional[ProcessGroup] = None,
  127. ) -> Dict[Any, Union[torch.Tensor, Any]]:
  128. """Broadcast the input tensor dictionary."""
  129. group = group or torch.distributed.group.WORLD
  130. ranks = torch.distributed.get_process_group_ranks(group)
  131. assert src in ranks, f"Invalid src rank ({src})"
  132. # Bypass the function if we are using only 1 GPU.
  133. world_size = torch.distributed.get_world_size(group=group)
  134. if world_size == 1:
  135. return tensor_dict
  136. rank = torch.distributed.get_rank()
  137. if rank == src:
  138. assert isinstance(
  139. tensor_dict,
  140. dict), (f"Expecting a dictionary, got {type(tensor_dict)}")
  141. metadata_list = []
  142. for key, value in tensor_dict.items():
  143. if isinstance(value, torch.Tensor):
  144. assert value.is_cuda, (
  145. f"Tensor {key}: {value} is not on cuda. Currently we only "
  146. f"support broadcasting tensors on cuda.")
  147. metadata_list.append(
  148. (key, TensorMetadata(value.dtype, value.size())))
  149. else:
  150. metadata_list.append((key, value))
  151. torch.distributed.broadcast_object_list([metadata_list],
  152. src=src,
  153. group=group)
  154. async_handles = []
  155. for key, value in metadata_list:
  156. if isinstance(value, TensorMetadata):
  157. tensor = tensor_dict[key]
  158. async_handles.append(
  159. torch.distributed.broadcast(tensor,
  160. src=src,
  161. group=group,
  162. async_op=True))
  163. for async_handle in async_handles:
  164. async_handle.wait()
  165. else:
  166. recv_metadata_list = [None]
  167. torch.distributed.broadcast_object_list(recv_metadata_list,
  168. src=src,
  169. group=group)
  170. metadata_list = recv_metadata_list[0]
  171. tensor_dict = {}
  172. async_handles = []
  173. for key, value in metadata_list: # pylint: disable=not-an-iterable
  174. if isinstance(value, TensorMetadata):
  175. tensor = torch.empty(value.size,
  176. dtype=value.dtype,
  177. device="cuda")
  178. async_handle = torch.distributed.broadcast(tensor,
  179. src=src,
  180. async_op=True,
  181. group=group)
  182. async_handles.append(async_handle)
  183. tensor_dict[key] = tensor
  184. else:
  185. tensor_dict[key] = value
  186. for async_handle in async_handles:
  187. async_handle.wait()
  188. return tensor_dict