test_multi_node_assignment.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. """Make sure ray assigns GPU workers to the correct node.
  2. Run:
  3. ```sh
  4. cd $APHRODITE_PATH/tests
  5. pytest distributed/test_multi_node_assignment.py
  6. ```
  7. """
  8. import os
  9. import pytest
  10. import ray
  11. from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
  12. from aphrodite import initialize_ray_cluster
  13. from aphrodite.common.config import ParallelConfig
  14. from aphrodite.common.utils import get_ip
  15. from aphrodite.executor.ray_utils import _wait_until_pg_removed
  16. APHRODITE_MULTI_NODE = os.getenv("APHRODITE_MULTI_NODE", "0") == "1"
  17. @pytest.mark.skipif(not APHRODITE_MULTI_NODE,
  18. reason="Need at least 2 nodes to run the test.")
  19. def test_multi_node_assignment() -> None:
  20. # NOTE: important to keep this class definition here
  21. # to let ray use cloudpickle to serialize it.
  22. class Actor:
  23. def get_ip(self):
  24. return get_ip()
  25. for _ in range(10):
  26. config = ParallelConfig(1, 2)
  27. initialize_ray_cluster(config)
  28. current_ip = get_ip()
  29. workers = []
  30. for bundle_id, bundle in enumerate(
  31. config.placement_group.bundle_specs):
  32. if not bundle.get("GPU", 0):
  33. continue
  34. scheduling_strategy = PlacementGroupSchedulingStrategy(
  35. placement_group=config.placement_group,
  36. placement_group_capture_child_tasks=True,
  37. placement_group_bundle_index=bundle_id,
  38. )
  39. worker = ray.remote(
  40. num_cpus=0,
  41. num_gpus=1,
  42. scheduling_strategy=scheduling_strategy,
  43. )(Actor).remote()
  44. worker_ip = ray.get(worker.get_ip.remote())
  45. assert worker_ip == current_ip
  46. workers.append(worker)
  47. for worker in workers:
  48. ray.kill(worker)
  49. _wait_until_pg_removed(config.placement_group)