test_pynccl.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. import multiprocessing
  2. import os
  3. from typing import Dict, List
  4. import pytest
  5. import torch
  6. import torch.distributed
  7. from aphrodite.common.utils import update_environment_variables
  8. from aphrodite.distributed.communication_op import ( # noqa
  9. tensor_model_parallel_all_reduce)
  10. from aphrodite.distributed.device_communicators.pynccl import (
  11. PyNcclCommunicator)
  12. from aphrodite.distributed.device_communicators.pynccl_wrapper import (
  13. NCCLLibrary)
  14. from aphrodite.distributed.parallel_state import (
  15. ensure_model_parallel_initialized, get_world_group, graph_capture,
  16. init_distributed_environment)
  17. def distributed_run(fn, world_size):
  18. number_of_processes = world_size
  19. processes: List[multiprocessing.Process] = []
  20. for i in range(number_of_processes):
  21. env: Dict[str, str] = {}
  22. env['RANK'] = str(i)
  23. env['LOCAL_RANK'] = str(i)
  24. env['WORLD_SIZE'] = str(number_of_processes)
  25. env['LOCAL_WORLD_SIZE'] = str(number_of_processes)
  26. env['MASTER_ADDR'] = 'localhost'
  27. env['MASTER_PORT'] = '12345'
  28. p = multiprocessing.Process(target=fn, args=(env, ))
  29. processes.append(p)
  30. p.start()
  31. for p in processes:
  32. p.join()
  33. for p in processes:
  34. assert p.exitcode == 0
  35. def worker_fn_wrapper(fn):
  36. # `multiprocessing.Process` cannot accept environment variables directly
  37. # so we need to pass the environment variables as arguments
  38. # and update the environment variables in the function
  39. def wrapped_fn(env):
  40. update_environment_variables(env)
  41. local_rank = os.environ['LOCAL_RANK']
  42. device = torch.device(f"cuda:{local_rank}")
  43. torch.cuda.set_device(device)
  44. init_distributed_environment()
  45. fn()
  46. return wrapped_fn
  47. @worker_fn_wrapper
  48. def worker_fn():
  49. pynccl_comm = PyNcclCommunicator(get_world_group().cpu_group,
  50. device=get_world_group().device)
  51. tensor = torch.ones(16, 1024, 1024,
  52. dtype=torch.float32).cuda(pynccl_comm.rank)
  53. with pynccl_comm.change_state(enable=True):
  54. pynccl_comm.all_reduce(tensor)
  55. result = tensor.mean().cpu().item()
  56. assert result == pynccl_comm.world_size
  57. @pytest.mark.skipif(torch.cuda.device_count() < 2,
  58. reason="Need at least 2 GPUs to run the test.")
  59. def test_pynccl():
  60. distributed_run(worker_fn, 2)
  61. @worker_fn_wrapper
  62. def multiple_allreduce_worker_fn():
  63. device = torch.device(f"cuda:{torch.distributed.get_rank()}")
  64. groups = [
  65. torch.distributed.new_group(ranks=[0, 1], backend="gloo"),
  66. torch.distributed.new_group(ranks=[2, 3], backend="gloo")
  67. ]
  68. group = groups[0] if torch.distributed.get_rank() in [0, 1] else groups[1]
  69. pynccl_comm = PyNcclCommunicator(group=group, device=device)
  70. tensor = torch.ones(16, 1024, 1024, dtype=torch.float32, device=device)
  71. with pynccl_comm.change_state(enable=True):
  72. # two groups can communicate independently
  73. if torch.distributed.get_rank() in [0, 1]:
  74. pynccl_comm.all_reduce(tensor)
  75. pynccl_comm.all_reduce(tensor)
  76. result = tensor.mean().cpu().item()
  77. assert result == 4
  78. else:
  79. pynccl_comm.all_reduce(tensor)
  80. result = tensor.mean().cpu().item()
  81. assert result == 2
  82. @pytest.mark.skipif(torch.cuda.device_count() < 4,
  83. reason="Need at least 4 GPUs to run the test.")
  84. def test_pynccl_multiple_allreduce():
  85. # this tests pynccl for multiple tp groups, in a standalone way
  86. # i.e. call `pynccl_comm.all_reduce` directly
  87. distributed_run(multiple_allreduce_worker_fn, 4)
  88. @worker_fn_wrapper
  89. def multiple_allreduce_with_aphrodite_worker_fn():
  90. device = torch.device(f"cuda:{torch.distributed.get_rank()}")
  91. ensure_model_parallel_initialized(2, 2)
  92. tensor = torch.ones(16, 1024, 1024, dtype=torch.float32, device=device)
  93. with graph_capture():
  94. # two tp groups can communicate independently
  95. if torch.distributed.get_rank() in [0, 1]:
  96. tensor = tensor_model_parallel_all_reduce(tensor)
  97. tensor = tensor_model_parallel_all_reduce(tensor)
  98. result = tensor.mean().cpu().item()
  99. assert result == 4
  100. else:
  101. tensor = tensor_model_parallel_all_reduce(tensor)
  102. result = tensor.mean().cpu().item()
  103. assert result == 2
  104. @pytest.mark.skipif(torch.cuda.device_count() < 4,
  105. reason="Need at least 4 GPUs to run the test.")
  106. def test_pynccl_multiple_allreduce_with_aphrodite():
  107. # this tests pynccl for multiple tp groups, together with aphrodite
  108. # i.e. call `tensor_model_parallel_all_reduce`
  109. distributed_run(multiple_allreduce_with_aphrodite_worker_fn, 4)
  110. @worker_fn_wrapper
  111. def worker_fn_with_cudagraph():
  112. with torch.no_grad():
  113. graph = torch.cuda.CUDAGraph()
  114. pynccl_comm = PyNcclCommunicator(get_world_group().cpu_group,
  115. device=get_world_group().device)
  116. # run something in the default stream to initialize torch engine
  117. a = torch.ones((4, 4), device=f'cuda:{pynccl_comm.rank}')
  118. torch.cuda.synchronize()
  119. with torch.cuda.graph(
  120. graph, stream=pynccl_comm.stream), pynccl_comm.change_state(
  121. enable=True):
  122. # operation during the graph capture is recorded but not executed
  123. # see https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#creating-a-graph-using-stream-capture # noqa
  124. pynccl_comm.all_reduce(a)
  125. pynccl_comm.stream.synchronize()
  126. assert a.mean().cpu().item() == pynccl_comm.world_size**0
  127. graph.replay()
  128. pynccl_comm.stream.synchronize()
  129. assert a.mean().cpu().item() == pynccl_comm.world_size**1
  130. @pytest.mark.skipif(torch.cuda.device_count() < 2,
  131. reason="Need at least 2 GPUs to run the test.")
  132. def test_pynccl_with_cudagraph():
  133. distributed_run(worker_fn_with_cudagraph, 2)
  134. @worker_fn_wrapper
  135. def send_recv_worker_fn():
  136. pynccl_comm = PyNcclCommunicator(get_world_group().cpu_group,
  137. device=get_world_group().device)
  138. if pynccl_comm.rank == 0:
  139. tensor = torch.ones(16, 1024, 1024,
  140. dtype=torch.float32).cuda(pynccl_comm.rank)
  141. else:
  142. tensor = torch.empty(16, 1024, 1024,
  143. dtype=torch.float32).cuda(pynccl_comm.rank)
  144. with pynccl_comm.change_state(enable=True):
  145. if pynccl_comm.rank == 0:
  146. pynccl_comm.send(tensor,
  147. dst=(pynccl_comm.rank + 1) %
  148. pynccl_comm.world_size)
  149. else:
  150. pynccl_comm.recv(tensor,
  151. src=(pynccl_comm.rank - 1) %
  152. pynccl_comm.world_size)
  153. result = tensor.mean().cpu().item()
  154. assert result == 1
  155. @pytest.mark.skipif(torch.cuda.device_count() < 2,
  156. reason="Need at least 2 GPUs to run the test.")
  157. def test_pynccl_send_recv():
  158. distributed_run(send_recv_worker_fn, 2)
  159. @worker_fn_wrapper
  160. def multiple_send_recv_worker_fn():
  161. device = torch.device(f"cuda:{torch.distributed.get_rank()}")
  162. groups = [
  163. torch.distributed.new_group(ranks=[0, 2], backend="gloo"),
  164. torch.distributed.new_group(ranks=[1, 3], backend="gloo")
  165. ]
  166. group = groups[0] if torch.distributed.get_rank() in [0, 2] else groups[1]
  167. pynccl_comm = PyNcclCommunicator(group=group, device=device)
  168. if torch.distributed.get_rank() == 0:
  169. tensor = torch.ones(16, 1024, 1024, dtype=torch.float32, device=device)
  170. elif torch.distributed.get_rank() == 1:
  171. tensor = 2 * torch.ones(
  172. 16, 1024, 1024, dtype=torch.float32, device=device)
  173. else:
  174. tensor = torch.empty(16,
  175. 1024,
  176. 1024,
  177. dtype=torch.float32,
  178. device=device)
  179. with pynccl_comm.change_state(enable=True):
  180. if torch.distributed.get_rank() in [0, 1]:
  181. pynccl_comm.send(tensor,
  182. dst=(pynccl_comm.rank + 1) %
  183. pynccl_comm.world_size)
  184. else:
  185. pynccl_comm.recv(tensor,
  186. src=(pynccl_comm.rank - 1) %
  187. pynccl_comm.world_size)
  188. result = tensor.mean().cpu().item()
  189. if torch.distributed.get_rank() in [0, 2]:
  190. assert result == 1
  191. else:
  192. assert result == 2
  193. @pytest.mark.skipif(torch.cuda.device_count() < 4,
  194. reason="Need at least 4 GPUs to run the test.")
  195. def test_pynccl_multiple_send_recv():
  196. distributed_run(multiple_send_recv_worker_fn, 4)
  197. def test_ncclGetUniqueId():
  198. lib = NCCLLibrary()
  199. unique_id = lib.ncclGetUniqueId()
  200. # `list(unique_id.internal)` is something like this:
  201. # [34, -16, 23, 83, 109, -19, 59, 95, 2, 0, -86, 55, 10, -128, 0, 29, 0,
  202. # 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  203. # 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  204. # 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  205. # 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  206. # 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
  207. # as long as the function doesn't raise an exception, we're good
  208. assert unique_id is not None