test_comm_ops.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """Test the communication operators.
  2. Run `pytest tests/distributed/test_comm_ops.py`.
  3. """
  4. import os
  5. import pytest
  6. import ray
  7. import torch
  8. from aphrodite.distributed import (broadcast_tensor_dict, get_pp_group,
  9. tensor_model_parallel_all_gather,
  10. tensor_model_parallel_all_reduce)
  11. from ..utils import init_test_distributed_environment, multi_process_parallel
  12. @ray.remote(num_gpus=1, max_calls=1)
  13. def all_reduce_test_worker(tp_size: int, pp_size: int, rank: int,
  14. distributed_init_port: str):
  15. # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
  16. # so that each worker can see all the GPUs
  17. # they will be able to set the device to the correct GPU
  18. del os.environ["CUDA_VISIBLE_DEVICES"]
  19. device = torch.device(f"cuda:{rank}")
  20. torch.cuda.set_device(device)
  21. init_test_distributed_environment(tp_size, pp_size, rank,
  22. distributed_init_port)
  23. num_elements = 8
  24. all_tensors = [
  25. torch.arange(num_elements, dtype=torch.float32, device="cuda") *
  26. (r + 1) for r in range(tp_size)
  27. ]
  28. expected = torch.sum(torch.stack(all_tensors, dim=0), dim=0)
  29. t = all_tensors[rank % tp_size]
  30. t = tensor_model_parallel_all_reduce(t)
  31. torch.testing.assert_close(t, expected)
  32. @ray.remote(num_gpus=1, max_calls=1)
  33. def all_gather_test_worker(tp_size: int, pp_size: int, rank: int,
  34. distributed_init_port: str):
  35. # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
  36. # so that each worker can see all the GPUs
  37. # they will be able to set the device to the correct GPU
  38. del os.environ["CUDA_VISIBLE_DEVICES"]
  39. device = torch.device(f"cuda:{rank}")
  40. torch.cuda.set_device(device)
  41. init_test_distributed_environment(tp_size, pp_size, rank,
  42. distributed_init_port)
  43. num_dimensions = 3
  44. tensor_size = list(range(2, num_dimensions + 2))
  45. total_size = 1
  46. for s in tensor_size:
  47. total_size *= s
  48. for all_gather_dimension in range(num_dimensions):
  49. all_tensors = [
  50. torch.arange(total_size, dtype=torch.float32,
  51. device="cuda").reshape(tensor_size) * (r + 1)
  52. for r in range(tp_size)
  53. ]
  54. expected = torch.cat(all_tensors, dim=all_gather_dimension)
  55. t = all_tensors[rank % tp_size]
  56. t = tensor_model_parallel_all_gather(t, all_gather_dimension)
  57. torch.testing.assert_close(t, expected)
  58. @ray.remote(num_gpus=1, max_calls=1)
  59. def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
  60. distributed_init_port: str):
  61. # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
  62. # so that each worker can see all the GPUs
  63. # they will be able to set the device to the correct GPU
  64. del os.environ["CUDA_VISIBLE_DEVICES"]
  65. device = torch.device(f"cuda:{rank}")
  66. torch.cuda.set_device(device)
  67. init_test_distributed_environment(tp_size, pp_size, rank,
  68. distributed_init_port)
  69. test_dict = {
  70. # device tensor
  71. "a": torch.arange(8, dtype=torch.float32, device="cuda"),
  72. # CPU tensor
  73. "b": torch.arange(16, dtype=torch.int8, device="cpu"),
  74. "c": "test",
  75. "d": [1, 2, 3],
  76. "e": {
  77. "a": 1,
  78. "b": 2
  79. },
  80. # empty tensor
  81. "f": torch.tensor([], dtype=torch.float32, device="cuda"),
  82. }
  83. if (rank % tp_size) == 0:
  84. broadcast_tensor_dict(test_dict, src=0)
  85. else:
  86. recv_dict = broadcast_tensor_dict(src=0)
  87. assert len(recv_dict) == len(test_dict)
  88. torch.testing.assert_close(recv_dict["a"], test_dict["a"])
  89. torch.testing.assert_close(recv_dict["b"], test_dict["b"])
  90. assert recv_dict["c"] == test_dict["c"]
  91. assert recv_dict["d"] == test_dict["d"]
  92. assert recv_dict["e"] == test_dict["e"]
  93. torch.testing.assert_close(recv_dict["f"], test_dict["f"])
  94. @ray.remote(num_gpus=1, max_calls=1)
  95. def send_recv_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
  96. distributed_init_port: str):
  97. del os.environ["CUDA_VISIBLE_DEVICES"]
  98. device = torch.device(f"cuda:{rank}")
  99. torch.cuda.set_device(device)
  100. init_test_distributed_environment(tp_size, pp_size, rank,
  101. distributed_init_port)
  102. test_dict = {
  103. # device tensor
  104. "a": torch.arange(8, dtype=torch.float32, device="cuda"),
  105. # CPU tensor
  106. "b": torch.arange(16, dtype=torch.int8, device="cpu"),
  107. "c": "test",
  108. "d": [1, 2, 3],
  109. "e": {
  110. "a": 1,
  111. "b": 2
  112. },
  113. # empty tensor
  114. "f": torch.tensor([], dtype=torch.float32, device="cuda"),
  115. }
  116. if not get_pp_group().is_first_rank:
  117. recv_dict = get_pp_group().recv_tensor_dict()
  118. if not get_pp_group().is_last_rank:
  119. get_pp_group().send_tensor_dict(test_dict)
  120. if not get_pp_group().is_first_rank:
  121. assert len(recv_dict) == len(test_dict)
  122. torch.testing.assert_close(recv_dict["a"], test_dict["a"])
  123. torch.testing.assert_close(recv_dict["b"], test_dict["b"])
  124. assert recv_dict["c"] == test_dict["c"]
  125. assert recv_dict["d"] == test_dict["d"]
  126. assert recv_dict["e"] == test_dict["e"]
  127. torch.testing.assert_close(recv_dict["f"], test_dict["f"])
  128. @ray.remote(num_gpus=1, max_calls=1)
  129. def send_recv_test_worker(tp_size: int, pp_size: int, rank: int,
  130. distributed_init_port: str):
  131. del os.environ["CUDA_VISIBLE_DEVICES"]
  132. device = torch.device(f"cuda:{rank}")
  133. torch.cuda.set_device(device)
  134. init_test_distributed_environment(tp_size, pp_size, rank,
  135. distributed_init_port)
  136. size = 64
  137. test_tensor = torch.arange(64, dtype=torch.float32, device="cuda")
  138. if not get_pp_group().is_first_rank:
  139. recv_tensor = get_pp_group().recv(size, dtype=torch.float32)
  140. if not get_pp_group().is_last_rank:
  141. get_pp_group().send(test_tensor)
  142. if not get_pp_group().is_first_rank:
  143. torch.testing.assert_close(test_tensor, recv_tensor)
  144. @pytest.mark.skipif(torch.cuda.device_count() < 2,
  145. reason="Need at least 2 GPUs to run the test.")
  146. @pytest.mark.parametrize("tp_size", [2])
  147. @pytest.mark.parametrize("test_target", [
  148. all_reduce_test_worker, all_gather_test_worker,
  149. broadcast_tensor_dict_test_worker
  150. ])
  151. def test_multi_process_tensor_parallel(tp_size, test_target):
  152. multi_process_parallel(tp_size, 1, test_target)
  153. @pytest.mark.skipif(torch.cuda.device_count() < 2,
  154. reason="Need at least 2 GPUs to run the test.")
  155. @pytest.mark.parametrize("pp_size", [2])
  156. @pytest.mark.parametrize(
  157. "test_target", [send_recv_test_worker, send_recv_tensor_dict_test_worker])
  158. def test_multi_process_pipeline_parallel(pp_size, test_target):
  159. multi_process_parallel(1, pp_size, test_target)
  160. @pytest.mark.skipif(torch.cuda.device_count() < 4,
  161. reason="Need at least 4 GPUs to run the test.")
  162. @pytest.mark.parametrize("tp_size", [2])
  163. @pytest.mark.parametrize("pp_size", [2])
  164. @pytest.mark.parametrize("test_target", [
  165. send_recv_test_worker, send_recv_tensor_dict_test_worker,
  166. all_reduce_test_worker, all_gather_test_worker,
  167. broadcast_tensor_dict_test_worker
  168. ])
  169. def test_multi_process_tensor_parallel_pipeline_parallel(
  170. tp_size, pp_size, test_target):
  171. multi_process_parallel(tp_size, pp_size, test_target)