123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- """Test the communication operators.
- Run `pytest tests/distributed/test_comm_ops.py`.
- """
- import os
- import pytest
- import ray
- import torch
- from aphrodite.distributed import (broadcast_tensor_dict, get_pp_group,
- tensor_model_parallel_all_gather,
- tensor_model_parallel_all_reduce)
- from ..utils import init_test_distributed_environment, multi_process_parallel
- @ray.remote(num_gpus=1, max_calls=1)
- def all_reduce_test_worker(tp_size: int, pp_size: int, rank: int,
- distributed_init_port: str):
- # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
- # so that each worker can see all the GPUs
- # they will be able to set the device to the correct GPU
- del os.environ["CUDA_VISIBLE_DEVICES"]
- device = torch.device(f"cuda:{rank}")
- torch.cuda.set_device(device)
- init_test_distributed_environment(tp_size, pp_size, rank,
- distributed_init_port)
- num_elements = 8
- all_tensors = [
- torch.arange(num_elements, dtype=torch.float32, device="cuda") *
- (r + 1) for r in range(tp_size)
- ]
- expected = torch.sum(torch.stack(all_tensors, dim=0), dim=0)
- t = all_tensors[rank % tp_size]
- t = tensor_model_parallel_all_reduce(t)
- torch.testing.assert_close(t, expected)
- @ray.remote(num_gpus=1, max_calls=1)
- def all_gather_test_worker(tp_size: int, pp_size: int, rank: int,
- distributed_init_port: str):
- # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
- # so that each worker can see all the GPUs
- # they will be able to set the device to the correct GPU
- del os.environ["CUDA_VISIBLE_DEVICES"]
- device = torch.device(f"cuda:{rank}")
- torch.cuda.set_device(device)
- init_test_distributed_environment(tp_size, pp_size, rank,
- distributed_init_port)
- num_dimensions = 3
- tensor_size = list(range(2, num_dimensions + 2))
- total_size = 1
- for s in tensor_size:
- total_size *= s
- for all_gather_dimension in range(num_dimensions):
- all_tensors = [
- torch.arange(total_size, dtype=torch.float32,
- device="cuda").reshape(tensor_size) * (r + 1)
- for r in range(tp_size)
- ]
- expected = torch.cat(all_tensors, dim=all_gather_dimension)
- t = all_tensors[rank % tp_size]
- t = tensor_model_parallel_all_gather(t, all_gather_dimension)
- torch.testing.assert_close(t, expected)
- @ray.remote(num_gpus=1, max_calls=1)
- def broadcast_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
- distributed_init_port: str):
- # it is important to delete the CUDA_VISIBLE_DEVICES environment variable
- # so that each worker can see all the GPUs
- # they will be able to set the device to the correct GPU
- del os.environ["CUDA_VISIBLE_DEVICES"]
- device = torch.device(f"cuda:{rank}")
- torch.cuda.set_device(device)
- init_test_distributed_environment(tp_size, pp_size, rank,
- distributed_init_port)
- test_dict = {
- # device tensor
- "a": torch.arange(8, dtype=torch.float32, device="cuda"),
- # CPU tensor
- "b": torch.arange(16, dtype=torch.int8, device="cpu"),
- "c": "test",
- "d": [1, 2, 3],
- "e": {
- "a": 1,
- "b": 2
- },
- # empty tensor
- "f": torch.tensor([], dtype=torch.float32, device="cuda"),
- }
- if (rank % tp_size) == 0:
- broadcast_tensor_dict(test_dict, src=0)
- else:
- recv_dict = broadcast_tensor_dict(src=0)
- assert len(recv_dict) == len(test_dict)
- torch.testing.assert_close(recv_dict["a"], test_dict["a"])
- torch.testing.assert_close(recv_dict["b"], test_dict["b"])
- assert recv_dict["c"] == test_dict["c"]
- assert recv_dict["d"] == test_dict["d"]
- assert recv_dict["e"] == test_dict["e"]
- torch.testing.assert_close(recv_dict["f"], test_dict["f"])
- @ray.remote(num_gpus=1, max_calls=1)
- def send_recv_tensor_dict_test_worker(tp_size: int, pp_size: int, rank: int,
- distributed_init_port: str):
- del os.environ["CUDA_VISIBLE_DEVICES"]
- device = torch.device(f"cuda:{rank}")
- torch.cuda.set_device(device)
- init_test_distributed_environment(tp_size, pp_size, rank,
- distributed_init_port)
- test_dict = {
- # device tensor
- "a": torch.arange(8, dtype=torch.float32, device="cuda"),
- # CPU tensor
- "b": torch.arange(16, dtype=torch.int8, device="cpu"),
- "c": "test",
- "d": [1, 2, 3],
- "e": {
- "a": 1,
- "b": 2
- },
- # empty tensor
- "f": torch.tensor([], dtype=torch.float32, device="cuda"),
- }
- if not get_pp_group().is_first_rank:
- recv_dict = get_pp_group().recv_tensor_dict()
- if not get_pp_group().is_last_rank:
- get_pp_group().send_tensor_dict(test_dict)
- if not get_pp_group().is_first_rank:
- assert len(recv_dict) == len(test_dict)
- torch.testing.assert_close(recv_dict["a"], test_dict["a"])
- torch.testing.assert_close(recv_dict["b"], test_dict["b"])
- assert recv_dict["c"] == test_dict["c"]
- assert recv_dict["d"] == test_dict["d"]
- assert recv_dict["e"] == test_dict["e"]
- torch.testing.assert_close(recv_dict["f"], test_dict["f"])
- @ray.remote(num_gpus=1, max_calls=1)
- def send_recv_test_worker(tp_size: int, pp_size: int, rank: int,
- distributed_init_port: str):
- del os.environ["CUDA_VISIBLE_DEVICES"]
- device = torch.device(f"cuda:{rank}")
- torch.cuda.set_device(device)
- init_test_distributed_environment(tp_size, pp_size, rank,
- distributed_init_port)
- size = 64
- test_tensor = torch.arange(64, dtype=torch.float32, device="cuda")
- if not get_pp_group().is_first_rank:
- recv_tensor = get_pp_group().recv(size, dtype=torch.float32)
- if not get_pp_group().is_last_rank:
- get_pp_group().send(test_tensor)
- if not get_pp_group().is_first_rank:
- torch.testing.assert_close(test_tensor, recv_tensor)
- @pytest.mark.skipif(torch.cuda.device_count() < 2,
- reason="Need at least 2 GPUs to run the test.")
- @pytest.mark.parametrize("tp_size", [2])
- @pytest.mark.parametrize("test_target", [
- all_reduce_test_worker, all_gather_test_worker,
- broadcast_tensor_dict_test_worker
- ])
- def test_multi_process_tensor_parallel(tp_size, test_target):
- multi_process_parallel(tp_size, 1, test_target)
- @pytest.mark.skipif(torch.cuda.device_count() < 2,
- reason="Need at least 2 GPUs to run the test.")
- @pytest.mark.parametrize("pp_size", [2])
- @pytest.mark.parametrize(
- "test_target", [send_recv_test_worker, send_recv_tensor_dict_test_worker])
- def test_multi_process_pipeline_parallel(pp_size, test_target):
- multi_process_parallel(1, pp_size, test_target)
- @pytest.mark.skipif(torch.cuda.device_count() < 4,
- reason="Need at least 4 GPUs to run the test.")
- @pytest.mark.parametrize("tp_size", [2])
- @pytest.mark.parametrize("pp_size", [2])
- @pytest.mark.parametrize("test_target", [
- send_recv_test_worker, send_recv_tensor_dict_test_worker,
- all_reduce_test_worker, all_gather_test_worker,
- broadcast_tensor_dict_test_worker
- ])
- def test_multi_process_tensor_parallel_pipeline_parallel(
- tp_size, pp_size, test_target):
- multi_process_parallel(tp_size, pp_size, test_target)
|