12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- """Make sure ray assigns GPU workers to the correct node.
- Run:
- ```sh
- cd $APHRODITE_PATH/tests
- pytest distributed/test_multi_node_assignment.py
- ```
- """
- import os
- import pytest
- import ray
- from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
- from aphrodite import initialize_ray_cluster
- from aphrodite.common.config import ParallelConfig
- from aphrodite.common.utils import get_ip
- from aphrodite.executor.ray_utils import _wait_until_pg_removed
- APHRODITE_MULTI_NODE = os.getenv("APHRODITE_MULTI_NODE", "0") == "1"
- @pytest.mark.skipif(not APHRODITE_MULTI_NODE,
- reason="Need at least 2 nodes to run the test.")
- def test_multi_node_assignment() -> None:
- # NOTE: important to keep this class definition here
- # to let ray use cloudpickle to serialize it.
- class Actor:
- def get_ip(self):
- return get_ip()
- for _ in range(10):
- config = ParallelConfig(1, 2)
- initialize_ray_cluster(config)
- current_ip = get_ip()
- workers = []
- for bundle_id, bundle in enumerate(
- config.placement_group.bundle_specs):
- if not bundle.get("GPU", 0):
- continue
- scheduling_strategy = PlacementGroupSchedulingStrategy(
- placement_group=config.placement_group,
- placement_group_capture_child_tasks=True,
- placement_group_bundle_index=bundle_id,
- )
- worker = ray.remote(
- num_cpus=0,
- num_gpus=1,
- scheduling_strategy=scheduling_strategy,
- )(Actor).remote()
- worker_ip = ray.get(worker.get_ip.remote())
- assert worker_ip == current_ip
- workers.append(worker)
- for worker in workers:
- ray.kill(worker)
- _wait_until_pg_removed(config.placement_group)
|