ray_utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import time
  2. from collections import defaultdict
  3. from typing import Dict, List, Optional, Tuple, Union
  4. import msgspec
  5. from loguru import logger
  6. from aphrodite.common.config import ParallelConfig
  7. from aphrodite.common.sequence import ExecuteModelRequest, IntermediateTensors
  8. from aphrodite.common.utils import get_ip, is_hip, is_xpu
  9. from aphrodite.executor.msgspec_utils import decode_hook, encode_hook
  10. from aphrodite.platforms import current_platform
  11. from aphrodite.task_handler.worker_base import WorkerWrapperBase
  12. PG_WAIT_TIMEOUT = 1800
  13. try:
  14. import ray
  15. from ray._private.state import available_resources_per_node
  16. from ray.util import placement_group_table
  17. from ray.util.placement_group import PlacementGroup
  18. class RayWorkerWrapper(WorkerWrapperBase):
  19. """Ray wrapper for aphrodite.task_handler.Worker, allowing Worker to be
  20. lazliy initialized after Ray sets CUDA_VISIBLE_DEVICES."""
  21. def __init__(self, *args, **kwargs) -> None:
  22. super().__init__(*args, **kwargs)
  23. # Since the compiled DAG runs a main execution
  24. # in a different thread that calls cuda.set_device.
  25. # The flag indicates is set_device is called on
  26. # that thread.
  27. self.compiled_dag_cuda_device_set = False
  28. self.input_decoder = msgspec.msgpack.Decoder(ExecuteModelRequest,
  29. dec_hook=decode_hook)
  30. self.output_encoder = msgspec.msgpack.Encoder(enc_hook=encode_hook)
  31. def get_node_ip(self) -> str:
  32. return get_ip()
  33. def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]:
  34. node_id = ray.get_runtime_context().get_node_id()
  35. gpu_ids = ray.get_gpu_ids()
  36. return node_id, gpu_ids
  37. def execute_model_spmd(
  38. self, req_or_tuple: Union[bytes,
  39. Tuple[bytes,
  40. Optional[IntermediateTensors]]]
  41. ) -> bytes:
  42. """Execute model in SPMD fashion: used only when SPMD worker and
  43. compiled DAG are both enabled.
  44. Args:
  45. req_or_tuple: A request or a tuple containing the
  46. request and intermediate tensors. Intermediate tensors are
  47. None unless if it is provided because it is > 0 pipeline
  48. stage. The request is serialized by msgspec.
  49. """
  50. if isinstance(req_or_tuple, bytes):
  51. serialized_req, intermediate_tensors = req_or_tuple, None
  52. else:
  53. serialized_req, intermediate_tensors = req_or_tuple
  54. execute_model_req = self.input_decoder.decode(serialized_req)
  55. # TODO: This is needed right now because Ray DAG executes
  56. # on a background thread, so we need to reset torch's current
  57. # device.
  58. import torch
  59. if not self.compiled_dag_cuda_device_set:
  60. torch.cuda.set_device(self.worker.device)
  61. self.compiled_dag_cuda_device_set = True
  62. output = self.worker._execute_model_spmd(execute_model_req,
  63. intermediate_tensors)
  64. # Pipeline model request and output to the next pipeline stage
  65. if isinstance(output, IntermediateTensors):
  66. output = serialized_req, output
  67. else:
  68. output = self.output_encoder.encode(output)
  69. return output
  70. ray_import_err = None
  71. except ImportError as e:
  72. ray = None # type: ignore
  73. ray_import_err = e
  74. RayWorkerWrapper = None # type: ignore
  75. def ray_is_available() -> bool:
  76. """Returns True if Ray is available."""
  77. return ray is not None
  78. def assert_ray_available():
  79. """Raise an exception if Ray is not available."""
  80. if ray is None:
  81. raise ValueError("Failed to import Ray, please install Ray with "
  82. "`pip install ray`.") from ray_import_err
  83. def _verify_bundles(placement_group: "PlacementGroup",
  84. parallel_config: ParallelConfig, device_str: str):
  85. """Verify a given placement group has bundles located in the right place.
  86. There are 2 rules.
  87. - Warn if all tensor parallel workers cannot fit in a single node.
  88. - Fail if driver node is not included in a placement group.
  89. """
  90. assert ray.is_initialized(), (
  91. "Ray is not initialized although distributed-executor-backend is ray.")
  92. pg_data = placement_group_table(placement_group)
  93. # bundle_idx -> node_id
  94. bundle_to_node_ids = pg_data["bundles_to_node_id"]
  95. # bundle_idx -> bundle (e.g., {"GPU": 1})
  96. bundles = pg_data["bundles"]
  97. # node_id -> List of bundle (e.g., {"GPU": 1})
  98. node_id_to_bundle: Dict[str, List[Dict[str, float]]] = defaultdict(list)
  99. for bundle_idx, node_id in bundle_to_node_ids.items():
  100. node_id_to_bundle[node_id].append(bundles[bundle_idx])
  101. driver_node_id = ray.get_runtime_context().get_node_id()
  102. if driver_node_id not in node_id_to_bundle:
  103. raise RuntimeError(
  104. f"driver node id {driver_node_id} is not included in a placement "
  105. f"group {placement_group.id}. Node id -> bundles "
  106. f"{node_id_to_bundle}. "
  107. "You don't have enough GPUs available in a current node. Check "
  108. "`ray status` to see if you have available GPUs in a node "
  109. f"{driver_node_id} before starting an vLLM engine.")
  110. for node_id, bundles in node_id_to_bundle.items():
  111. if len(bundles) < parallel_config.tensor_parallel_size:
  112. logger.warning(
  113. "tensor_parallel_size=%d "
  114. "is bigger than a reserved number of %ss (%d "
  115. "%ss) in a node %s. Tensor parallel workers can be "
  116. "spread out to 2+ nodes which can degrade the performance "
  117. "unless you have fast interconnect across nodes, like "
  118. "Infiniband. To resolve this issue, make sure you have more "
  119. "than %d GPUs available at each node.",
  120. parallel_config.tensor_parallel_size, device_str, len(bundles),
  121. device_str, node_id, parallel_config.tensor_parallel_size)
  122. def _wait_until_pg_ready(current_placement_group: "PlacementGroup"):
  123. """Wait until a placement group is ready.
  124. It prints the informative log messages if the placement group is
  125. not created within time.
  126. """
  127. # Wait until PG is ready - this will block until all
  128. # requested resources are available, and will timeout
  129. # if they cannot be provisioned.
  130. placement_group_specs = current_placement_group.bundle_specs
  131. s = time.time()
  132. pg_ready_ref = current_placement_group.ready()
  133. wait_interval = 10
  134. while time.time() - s < PG_WAIT_TIMEOUT:
  135. ready, _ = ray.wait([pg_ready_ref], timeout=wait_interval)
  136. if len(ready) > 0:
  137. break
  138. # Exponential backoff for warning print.
  139. wait_interval *= 2
  140. logger.info(
  141. "Waiting for creating a placement group of specs for "
  142. "%d seconds. specs=%s. Check "
  143. "`ray status` to see if you have enough resources.",
  144. int(time.time() - s), placement_group_specs)
  145. try:
  146. ray.get(pg_ready_ref, timeout=0)
  147. except ray.exceptions.GetTimeoutError:
  148. raise ValueError(
  149. "Cannot provide a placement group of "
  150. f"{placement_group_specs=} within {PG_WAIT_TIMEOUT} seconds. See "
  151. "`ray status` to make sure the cluster has enough resources."
  152. ) from None
  153. def _wait_until_pg_removed(current_placement_group: "PlacementGroup"):
  154. ray.util.remove_placement_group(current_placement_group)
  155. s = time.time()
  156. wait_interval = 10
  157. while time.time() - s < PG_WAIT_TIMEOUT:
  158. pg = ray.util.get_current_placement_group()
  159. if pg is None:
  160. break
  161. # Exponential backoff for warning print.
  162. wait_interval *= 2
  163. logger.info(
  164. "Waiting for removing a placement group of specs for "
  165. "%d seconds.", int(time.time() - s))
  166. time.sleep(wait_interval)
  167. def initialize_ray_cluster(
  168. parallel_config: ParallelConfig,
  169. ray_address: Optional[str] = None,
  170. ):
  171. """Initialize the distributed cluster with Ray.
  172. it will connect to the Ray cluster and create a placement group
  173. for the workers, which includes the specification of the resources
  174. for each distributed worker.
  175. Args:
  176. parallel_config: The configurations for parallel execution.
  177. ray_address: The address of the Ray cluster. If None, uses
  178. the default Ray cluster address.
  179. """
  180. assert_ray_available()
  181. # Connect to a ray cluster.
  182. if is_hip() or is_xpu():
  183. ray.init(address=ray_address,
  184. ignore_reinit_error=True,
  185. num_gpus=parallel_config.world_size)
  186. else:
  187. ray.init(address=ray_address, ignore_reinit_error=True)
  188. if parallel_config.placement_group:
  189. # Placement group is already set.
  190. return
  191. device_str = "GPU" if not current_platform.is_tpu() else "TPU"
  192. # Create placement group for worker processes
  193. current_placement_group = ray.util.get_current_placement_group()
  194. if current_placement_group:
  195. # We are in a placement group
  196. bundles = current_placement_group.bundle_specs
  197. # Verify that we can use the placement group.
  198. device_bundles = 0
  199. for bundle in bundles:
  200. bundle_devices = bundle.get(device_str, 0)
  201. if bundle_devices > 1:
  202. raise ValueError(
  203. "Placement group bundle cannot have more than 1 "
  204. f"{device_str}.")
  205. if bundle_devices:
  206. device_bundles += 1
  207. if parallel_config.world_size > device_bundles:
  208. raise ValueError(
  209. f"The number of required {device_str}s exceeds the total "
  210. f"number of available {device_str}s in the placement group."
  211. f"Required number of devices: {parallel_config.world_size}. "
  212. f"Total number of devices: {device_bundles}.")
  213. else:
  214. num_devices_in_cluster = ray.cluster_resources().get(device_str, 0)
  215. if parallel_config.world_size > num_devices_in_cluster:
  216. raise ValueError(
  217. f"The number of required {device_str}s exceeds the total "
  218. f"number of available {device_str}s in the placement group.")
  219. # Create a new placement group
  220. placement_group_specs: List[Dict[str, float]] = ([{
  221. device_str: 1.0
  222. } for _ in range(parallel_config.world_size)])
  223. # Aphrodite engine is also a worker to execute model with an accelerator
  224. # so it requires to have the device in a current node. Check if
  225. # the current node has at least one device.
  226. current_ip = get_ip()
  227. current_node_id = ray.get_runtime_context().get_node_id()
  228. current_node_resource = available_resources_per_node()[current_node_id]
  229. if current_node_resource.get(device_str, 0) < 1:
  230. raise ValueError(
  231. f"Current node has no {device_str} available. "
  232. f"{current_node_resource=}. Aphrodite engine cannot start "
  233. f"without {device_str}. Make sure you have at least 1 "
  234. f"{device_str} available in a node {current_node_id=} "
  235. f"{current_ip=}.")
  236. # This way, at least bundle is required to be created in a current
  237. # node.
  238. placement_group_specs[0][f"node:{current_ip}"] = 0.001
  239. # By default, Ray packs resources as much as possible.
  240. current_placement_group = ray.util.placement_group(
  241. placement_group_specs, strategy="PACK")
  242. _wait_until_pg_ready(current_placement_group)
  243. assert current_placement_group is not None
  244. _verify_bundles(current_placement_group, parallel_config, device_str)
  245. # Set the placement group in the parallel config
  246. parallel_config.placement_group = current_placement_group