worker_base.py 17 KB


  1. import dataclasses
  2. import importlib
  3. import os
  4. from abc import ABC, abstractmethod
  5. from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
  6. import torch
  7. from loguru import logger
  8. from aphrodite.common.sequence import (ExecuteModelRequest,
  9. IntermediateTensors, SamplerOutput)
  10. from aphrodite.common.utils import (enable_trace_function_call_for_thread,
  11. update_environment_variables)
  12. from aphrodite.distributed import (broadcast_tensor_dict, get_pp_group,
  13. get_tp_group)
  14. from aphrodite.lora.request import LoRARequest
  15. from aphrodite.platforms import current_platform
  16. from aphrodite.task_handler.model_runner_base import (BroadcastableModelInput,
  17. ModelRunnerBase,
  18. ModelRunnerInputBase)
  19. class WorkerBase(ABC):
  20. """Worker interface that allows Aphrodite to cleanly separate
  21. implementations for different hardware. Also abstracts control plane
  22. communication, e.g., to communicate request metadata to other workers.
  23. """
  24. @abstractmethod
  25. def init_device(self) -> None:
  26. """Initialize device state, such as loading the model or other on-device
  27. memory allocations.
  28. """
  29. raise NotImplementedError
  30. @abstractmethod
  31. def determine_num_available_blocks(self) -> Tuple[int, int]:
  32. """Determine the number of available blocks for the GPU KV cache and
  33. swappable CPU KV cache.
  34. The implementation may run profiling or other heuristics to determine
  35. the size of caches.
  36. Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
  37. are blocks that are "active" on the device and can be appended to.
  38. num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
  39. appended to.
  40. """
  41. raise NotImplementedError
  42. @abstractmethod
  43. def initialize_cache(self, num_gpu_blocks: int,
  44. num_cpu_blocks: int) -> None:
  45. """Initialize the KV cache with the given size in blocks.
  46. """
  47. raise NotImplementedError
  48. @current_platform.inference_mode()
  49. def start_worker_execution_loop(self) -> None:
  50. """Execute model loop in parallel worker.
  51. You can stop the loop by executing a driver worker with an empty output.
  52. See `stop_remote_worker_execution_loop` for more details.
  53. """
  54. while True:
  55. output = self.execute_model(execute_model_req=None)
  56. if output is None:
  57. return None
  58. @abstractmethod
  59. def execute_model(
  60. self,
  61. execute_model_req: Optional[ExecuteModelRequest] = None
  62. ) -> Optional[List[SamplerOutput]]:
  63. raise NotImplementedError
  64. @abstractmethod
  65. def get_cache_block_size_bytes(self) -> int:
  66. """Return the size of a single cache block, in bytes. Used in
  67. speculative decoding.
  68. """
  69. raise NotImplementedError
  70. @abstractmethod
  71. def add_lora(self, lora_request: LoRARequest) -> bool:
  72. raise NotImplementedError
  73. @abstractmethod
  74. def remove_lora(self, lora_id: int) -> bool:
  75. raise NotImplementedError
  76. @abstractmethod
  77. def pin_lora(self, lora_id: int) -> bool:
  78. raise NotImplementedError
  79. @abstractmethod
  80. def list_loras(self) -> Set[int]:
  81. raise NotImplementedError
  82. class LoraNotSupportedWorkerBase(WorkerBase):
  83. """Partial implementation of WorkerBase that raises exceptions when LoRA
  84. methods are invoked.
  85. """
  86. def add_lora(self, lora_request: LoRARequest) -> bool:
  87. raise ValueError(f"{type(self)} does not support LoRA")
  88. def remove_lora(self, lora_id: int) -> bool:
  89. raise ValueError(f"{type(self)} does not support LoRA")
  90. def pin_lora(self, lora_id: int) -> bool:
  91. return ValueError(
  92. f"{type(self)} does not support LoRA") # type: ignore
  93. def list_loras(self) -> Set[int]:
  94. raise ValueError(f"{type(self)} does not support LoRA")
  95. @dataclasses.dataclass(frozen=True)
  96. class WorkerInput:
  97. """Local inputs to each worker. May contain device-specific data. These
  98. fields should be broadcastable to other workers.
  99. """
  100. num_seq_groups: Optional[int] = None
  101. blocks_to_swap_in: Optional[torch.Tensor] = None
  102. blocks_to_swap_out: Optional[torch.Tensor] = None
  103. blocks_to_copy: Optional[torch.Tensor] = None
  104. virtual_engine: int = 0
  105. num_steps: int = 1
  106. @classmethod
  107. def from_broadcasted_tensor_dict(
  108. cls: Type["WorkerInput"],
  109. tensor_dict: Dict[str, Any],
  110. ) -> "WorkerInput":
  111. """
  112. Pop fields from the given tensor_dict and populate a new instance of
  113. WorkerInput.
  114. """
  115. return cls(
  116. num_seq_groups=tensor_dict.pop("num_seq_groups"),
  117. blocks_to_swap_in=tensor_dict.pop("blocks_to_swap_in"),
  118. blocks_to_swap_out=tensor_dict.pop("blocks_to_swap_out"),
  119. blocks_to_copy=tensor_dict.pop("blocks_to_copy"),
  120. virtual_engine=tensor_dict["virtual_engine"],
  121. num_steps=tensor_dict.pop("num_steps"),
  122. )
  123. def as_broadcastable_tensor_dict(
  124. self) -> Dict[str, Union[int, torch.Tensor]]:
  125. """
  126. Extract broadcastable fields.
  127. """
  128. tensor_dict = {
  129. "num_seq_groups": self.num_seq_groups,
  130. "blocks_to_swap_in": self.blocks_to_swap_in,
  131. "blocks_to_swap_out": self.blocks_to_swap_out,
  132. "blocks_to_copy": self.blocks_to_copy,
  133. "virtual_engine": self.virtual_engine,
  134. "num_steps": self.num_steps,
  135. }
  136. return tensor_dict
  137. class LocalOrDistributedWorkerBase(WorkerBase):
  138. """
  139. Partial implementation of WorkerBase that has a default `execute_model`
  140. definition to perform metadata transfer between workers when in distributed
  141. mode. Subclasses of this interface should use model runners that inherit
  142. from ModelRunnerBase, and should only need to implement worker-local logic.
  143. If custom control plane logic is needed to transfer metadata, or if the
  144. model runner cannot inherit from ModelRunnerBase, use WorkerBase instead.
  145. """
  146. is_driver_worker: bool
  147. model_runner: ModelRunnerBase
  148. @property
  149. @abstractmethod
  150. def do_metadata_broadcast(self) -> bool:
  151. """
  152. Used by the default `execute_model` to check whether broadcast is
  153. needed to transfer request inputs from the driver worker to other
  154. workers in the TP group. If WorkerBase subclass only supports
  155. single-worker execution, then this method should return False.
  156. """
  157. raise NotImplementedError
  158. @property
  159. @abstractmethod
  160. def kv_cache(self) -> Optional[List[List[torch.Tensor]]]:
  161. """
  162. Gets the list of kv caches to pass to the worker's model runner. Each
  163. element in the list is a kv cache corresponding to a particular virtual
  164. engine (PP stream). Used by the default `execute_model`. If the worker's
  165. model runner does not follow the ModelRunnerBase interface, then inherit
  166. from WorkerBase instead.
  167. """
  168. raise NotImplementedError
  169. @abstractmethod
  170. def prepare_worker_input(
  171. self, execute_model_req: ExecuteModelRequest) -> WorkerInput:
  172. """
  173. Prepare the inputs to WorkerBase.execute_worker from an execution
  174. request. This method may move data to the worker's local device. It is
  175. not allowed to communicate with other workers or devices.
  176. """
  177. raise NotImplementedError
  178. @abstractmethod
  179. def execute_worker(self, worker_input: WorkerInput) -> None:
  180. """
  181. Process an execution request.
  182. """
  183. raise NotImplementedError
  184. def _get_worker_input_from_broadcast(
  185. self
  186. ) -> Optional[Tuple[BroadcastableModelInput, WorkerInput, Dict[
  187. str, torch.Tensor]]]:
  188. """ Get the worker input from the broadcasted tensor dict. """
  189. assert self.do_metadata_broadcast
  190. assert not self.is_driver_worker
  191. broadcast_data = broadcast_tensor_dict(src=0)
  192. if not broadcast_data:
  193. return None
  194. worker_input = WorkerInput.from_broadcasted_tensor_dict(broadcast_data)
  195. model_input = (
  196. self.model_runner.make_model_input_from_broadcasted_tensor_dict(
  197. broadcast_data))
  198. kwargs = extract_previous_hidden_states(broadcast_data)
  199. return model_input, worker_input, kwargs
  200. def _get_driver_input_and_broadcast(
  201. self, execute_model_req: ExecuteModelRequest
  202. ) -> Tuple[BroadcastableModelInput, WorkerInput, Dict[str, torch.Tensor]]:
  203. """ Get the driver input and broadcast it to other workers. """
  204. assert self.is_driver_worker
  205. worker_input: WorkerInput = self.prepare_worker_input(
  206. execute_model_req=execute_model_req)
  207. model_input: ModelRunnerInputBase = (
  208. self.model_runner.prepare_model_input(
  209. execute_model_req.seq_group_metadata_list,
  210. execute_model_req.virtual_engine,
  211. execute_model_req.finished_requests_ids))
  212. kwargs = extract_previous_hidden_states(execute_model_req)
  213. if self.do_metadata_broadcast:
  214. broadcast_data = worker_input.as_broadcastable_tensor_dict()
  215. broadcast_data.update(model_input.as_broadcastable_tensor_dict())
  216. broadcast_data.update(kwargs)
  217. broadcast_tensor_dict(broadcast_data, src=0)
  218. if execute_model_req.output_proc_callback_fn:
  219. model_input = dataclasses.replace( # type: ignore
  220. model_input,
  221. output_proc_callback_fn=execute_model_req.
  222. output_proc_callback_fn)
  223. return model_input, worker_input, kwargs
  224. def prepare_input(
  225. self,
  226. execute_model_req: Optional[ExecuteModelRequest] = None
  227. ) -> Optional[Tuple[BroadcastableModelInput, WorkerInput, Dict[
  228. str, torch.Tensor]]]:
  229. """
  230. Prepare the inputs to ModelRunner and workers.
  231. """
  232. if self.is_driver_worker:
  233. if execute_model_req is None:
  234. if self.do_metadata_broadcast:
  235. # This signals that there's no more requests to process for
  236. # now. All workers are running infinite loop with
  237. # broadcast_tensor_dict, and it stops the loop when the
  238. # driver broadcasts an empty input. Send an empty input to
  239. # notify all other workers to stop their execution loop.
  240. broadcast_tensor_dict({}, src=0)
  241. return None
  242. return self._get_driver_input_and_broadcast(execute_model_req)
  243. else:
  244. return self._get_worker_input_from_broadcast()
  245. def execute_model(
  246. self,
  247. execute_model_req: Optional[ExecuteModelRequest] = None
  248. ) -> Optional[List[SamplerOutput]]:
  249. """Executes at least one model step on the given sequences, unless no
  250. sequences are provided."""
  251. inputs = self.prepare_input(execute_model_req)
  252. if inputs is None:
  253. return None
  254. model_input, worker_input, kwargs = inputs
  255. num_steps = worker_input.num_steps
  256. self.execute_worker(worker_input)
  257. # If there is no input, we don't need to execute the model.
  258. if worker_input.num_seq_groups == 0:
  259. return []
  260. intermediate_tensors = None
  261. if not get_pp_group().is_first_rank:
  262. intermediate_tensors = IntermediateTensors(
  263. get_pp_group().recv_tensor_dict(
  264. all_gather_group=get_tp_group()))
  265. output = self.model_runner.execute_model(
  266. model_input=model_input,
  267. kv_caches=self.kv_cache[worker_input.virtual_engine]
  268. if self.kv_cache is not None else None,
  269. intermediate_tensors=intermediate_tensors,
  270. num_steps=num_steps,
  271. **kwargs,
  272. )
  273. if not get_pp_group().is_last_rank:
  274. # output is IntermediateTensors
  275. get_pp_group().send_tensor_dict(output.tensors,
  276. all_gather_group=get_tp_group())
  277. return [None]
  278. # output is List[SamplerOutput]
  279. return output
  280. def _execute_model_spmd(
  281. self,
  282. execute_model_req: ExecuteModelRequest,
  283. intermediate_tensors: Optional[IntermediateTensors] = None
  284. ) -> Optional[List[SamplerOutput]]:
  285. """
  286. Execute model in Single Program Multiple Data (SPMD) fashion.
  287. All workers take the same request, prepare the input and
  288. execute the model.
  289. """
  290. assert execute_model_req is not None, (
  291. "_execute_model_spmd() requires each worker to take in an "
  292. "ExecuteModelRequest")
  293. worker_input: WorkerInput = self.prepare_worker_input(
  294. execute_model_req=execute_model_req)
  295. model_input: ModelRunnerInputBase = (
  296. self.model_runner.prepare_model_input(
  297. execute_model_req.seq_group_metadata_list))
  298. self.execute_worker(worker_input)
  299. # If there is no input, we don't need to execute the model.
  300. if worker_input.num_seq_groups == 0:
  301. return []
  302. kwargs = extract_previous_hidden_states(execute_model_req)
  303. return self.model_runner.execute_model(
  304. model_input=model_input,
  305. kv_caches=self.kv_cache[worker_input.virtual_engine]
  306. if self.kv_cache is not None else None,
  307. intermediate_tensors=intermediate_tensors,
  308. **kwargs,
  309. )
  310. class WorkerWrapperBase:
  311. """
  312. The whole point of this class is to lazily initialize the worker.
  313. We first instantiate the WorkerWrapper, which remembers the worker module
  314. and class name. Then, when we call `update_environment_variables`, and the
  315. real initialization happens in `init_worker`.
  316. If worker_class_fn is specified, it will be executed to get the worker
  317. class.
  318. Otherwise, the worker class will be obtained by dynamically importing it
  319. using worker_module_name and worker_class_name.
  320. """
  321. def __init__(
  322. self,
  323. worker_module_name: str,
  324. worker_class_name: str,
  325. trust_remote_code: bool = False,
  326. worker_class_fn: Optional[Callable[[],
  327. Type[WorkerBase]]] = None) -> None:
  328. self.worker_module_name = worker_module_name
  329. self.worker_class_name = worker_class_name
  330. self.worker_class_fn = worker_class_fn
  331. self.worker: Optional[WorkerBase] = None
  332. if trust_remote_code:
  333. # note: lazy import to avoid importing torch before initializing
  334. from aphrodite.common.utils import init_cached_hf_modules
  335. init_cached_hf_modules()
  336. @staticmethod
  337. def update_environment_variables(envs: Dict[str, str]) -> None:
  338. key = 'CUDA_VISIBLE_DEVICES'
  339. if key in envs and key in os.environ:
  340. # overwriting CUDA_VISIBLE_DEVICES is desired behavior
  341. # suppress the warning in `update_environment_variables`
  342. del os.environ[key]
  343. update_environment_variables(envs)
  344. def init_worker(self, *args, **kwargs):
  345. """
  346. Here we inject some common logic before initializing the worker.
  347. Arguments are passed to the worker class constructor.
  348. """
  349. enable_trace_function_call_for_thread()
  350. # see https://github.com/NVIDIA/nccl/issues/1234
  351. os.environ['NCCL_CUMEM_ENABLE'] = '0'
  352. from aphrodite.plugins import load_general_plugins
  353. load_general_plugins()
  354. if self.worker_class_fn:
  355. worker_class = self.worker_class_fn()
  356. else:
  357. mod = importlib.import_module(self.worker_module_name)
  358. worker_class = getattr(mod, self.worker_class_name)
  359. self.worker = worker_class(*args, **kwargs)
  360. assert self.worker is not None
  361. def execute_method(self, method, *args, **kwargs):
  362. try:
  363. target = self if self.worker is None else self.worker
  364. executor = getattr(target, method)
  365. return executor(*args, **kwargs)
  366. except Exception as e:
  367. # if the driver worker also execute methods,
  368. # exceptions in the rest worker may cause deadlock in rpc like ray
  369. # print the error and inform the user to solve the error
  370. msg = (f"Error executing method {method}. "
  371. "This might cause deadlock in distributed execution.")
  372. logger.exception(msg)
  373. raise e
  374. def extract_previous_hidden_states(
  375. data: Union[ExecuteModelRequest, Dict[str, torch.Tensor]]) -> \
  376. Dict[str, torch.Tensor]:
  377. """If data contains previous_hidden_states, extract it. This returns a dict
  378. which can be used directly as additional kwargs in any following
  379. execute_model calls. This is used in draft models like EAGLE."""
  380. output = {}
  381. # When called from non-driver worker, data is dict but when called from
  382. # driver worker, data is ExecuteModelRequest.
  383. if isinstance(data, dict):
  384. if "previous_hidden_states" in data:
  385. output["previous_hidden_states"] = data["previous_hidden_states"]
  386. elif data.previous_hidden_states is not None:
  387. output["previous_hidden_states"] = data.previous_hidden_states\
  388. .hidden_states
  389. return output