ray_xpu_executor.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435
  1. import asyncio
  2. from typing import List, Optional
  3. import aphrodite.common.envs as envs
  4. from aphrodite.common.utils import get_aphrodite_instance_id, make_async
  5. from aphrodite.executor.ray_gpu_executor import (RayGPUExecutor,
  6. RayGPUExecutorAsync)
  7. from aphrodite.executor.xpu_executor import XPUExecutor
  8. class RayXPUExecutor(RayGPUExecutor, XPUExecutor):
  9. def _get_env_vars_to_be_updated(self):
  10. # Get the set of GPU IDs used on each node.
  11. worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids",
  12. use_dummy_driver=True)
  13. APHRODITE_INSTANCE_ID = get_aphrodite_instance_id()
  14. # Set environment variables for the driver and workers.
  15. all_args_to_update_environment_variables = [({
  16. "APHRODITE_INSTANCE_ID":
  17. APHRODITE_INSTANCE_ID,
  18. "APHRODITE_TRACE_FUNCTION":
  19. str(envs.APHRODITE_TRACE_FUNCTION),
  20. }, ) for (_, _) in worker_node_and_gpu_ids]
  21. return all_args_to_update_environment_variables
  22. class RayXPUExecutorAsync(RayXPUExecutor, RayGPUExecutorAsync):
  23. def __init__(self, *args, **kwargs):
  24. super().__init__(*args, **kwargs)
  25. self.driver_exec_method = make_async(self.driver_worker.execute_method)
  26. self.pp_locks: Optional[List[asyncio.Lock]] = None