multi_step_worker.py 9.6 KB


  1. import dataclasses
  2. from dataclasses import dataclass
  3. from typing import Dict, List, Optional, Tuple
  4. import torch
  5. from aphrodite.common.sequence import ExecuteModelRequest
  6. from aphrodite.distributed import broadcast_tensor_dict, get_pp_group
  7. from aphrodite.modeling.layers.sampler import SamplerOutput
  8. from aphrodite.task_handler.model_runner_base import BroadcastableModelInput
  9. from aphrodite.task_handler.multi_step_model_runner import (
  10. MultiStepModelRunner, StatefulModelInput)
  11. from aphrodite.task_handler.worker import Worker, WorkerInput
  12. @dataclass
  13. class MultiStepState:
  14. worker_input: WorkerInput
  15. model_input: StatefulModelInput
  16. class MultiStepWorker(Worker):
  17. def __init__(self, *args, **kwargs):
  18. super().__init__(*args, **kwargs)
  19. base_model_runner = self.model_runner
  20. # for multi-step model, wrap the model runner with MultiStepModelRunner
  21. self.model_runner = MultiStepModelRunner(
  22. base_model_runner,
  23. base_model_runner.model_config,
  24. base_model_runner.parallel_config,
  25. base_model_runner.scheduler_config,
  26. base_model_runner.device_config,
  27. base_model_runner.cache_config,
  28. load_config=base_model_runner.load_config,
  29. lora_config=self.lora_config,
  30. kv_cache_dtype=self.cache_config.cache_dtype,
  31. is_driver_worker=base_model_runner.is_driver_worker,
  32. prompt_adapter_config=base_model_runner.prompt_adapter_config,
  33. )
  34. pipeline_parallel_size = self.parallel_config.pipeline_parallel_size
  35. self.multi_step_states: List[Optional[MultiStepState]] = [
  36. None
  37. ] * pipeline_parallel_size
  38. self.temp_output = None
  39. def _get_driver_input_and_broadcast(
  40. self, execute_model_req: ExecuteModelRequest
  41. ) -> Tuple[BroadcastableModelInput, WorkerInput, Dict[str, torch.Tensor]]:
  42. """
  43. Get the driver input and broadcast it to other workers.
  44. """
  45. assert self.is_driver_worker
  46. virtual_engine = execute_model_req.virtual_engine
  47. is_first_multi_step = execute_model_req.is_first_multi_step
  48. if is_first_multi_step:
  49. # on first step we prepare the worker input and model input normally
  50. worker_input: WorkerInput = self.prepare_worker_input(
  51. execute_model_req=execute_model_req
  52. )
  53. model_input: StatefulModelInput = (
  54. self.model_runner.prepare_model_input(
  55. execute_model_req.seq_group_metadata_list,
  56. execute_model_req.virtual_engine,
  57. execute_model_req.finished_requests_ids))
  58. if execute_model_req.async_callback:
  59. model_input.frozen_model_input = dataclasses.replace( # type: ignore
  60. model_input.frozen_model_input,
  61. async_callback=execute_model_req.async_callback,
  62. use_async_and_multi_step=execute_model_req.
  63. use_async_and_multi_step)
  64. else:
  65. # on subsequent steps we reuse the worker input and model input
  66. multi_step_state = self.multi_step_states[virtual_engine]
  67. worker_input = multi_step_state.worker_input
  68. model_input = multi_step_state.model_input
  69. frozen_model_input = model_input.frozen_model_input
  70. assert frozen_model_input is not None
  71. assert frozen_model_input.attn_metadata is not None
  72. # clear the cached decode metadata so that it can be recomputed on
  73. # the workers
  74. frozen_model_input.attn_metadata._cached_decode_metadata = None
  75. model_input.is_first_multi_step = is_first_multi_step
  76. model_input.is_last_step = execute_model_req.is_last_step
  77. if not is_first_multi_step:
  78. # we broadcast the last sampled token ids to all TP workers so they
  79. # can update their model input metadata in-place.
  80. self._prepare_last_sampled_token_ids_for_tp_workers(
  81. execute_model_req=execute_model_req, model_input=model_input
  82. )
  83. if self.do_metadata_broadcast:
  84. broadcast_data = worker_input.as_broadcastable_tensor_dict()
  85. broadcast_data.update(model_input.as_broadcastable_tensor_dict())
  86. broadcast_tensor_dict(broadcast_data, src=0)
  87. # Retuning empty dict here to keep this compatible with
  88. # `LocalOrDistributedWorkerBase._get_driver_input_and_broadcast`
  89. return model_input, worker_input, {}
  90. def _prepare_last_sampled_token_ids_for_tp_workers(
  91. self,
  92. execute_model_req: ExecuteModelRequest,
  93. model_input: StatefulModelInput,
  94. ) -> None:
  95. """
  96. Prepare the last sampled token ids for TP workers. If it's the last
  97. PP rank, then the last sampled token ids are already in the model_input.
  98. If it is NOT the last PP rank, then we need to get the last sampled
  99. token that is cached in the execute_model_req.
  100. """
  101. if get_pp_group().is_last_rank:
  102. assert (
  103. model_input.cached_outputs[-1].sampler_output.sampled_token_ids
  104. is None
  105. )
  106. assert model_input.cached_outputs[-1].sampled_token_ids is not None
  107. model_input.last_sampled_token_ids = model_input.cached_outputs[
  108. -1
  109. ].sampled_token_ids
  110. # free sampled token ids from the previous step if it has been
  111. # pythonized. Cannot free the last sampled token ids because
  112. # we need it for GPU advance_step.
  113. for output in model_input.cached_outputs[:-1]:
  114. if output.pythonized:
  115. output.sampled_token_ids = None
  116. else:
  117. # otherwise we need to get the cached sampled token ids from the
  118. # execute_model_req
  119. assert execute_model_req.last_sampled_token_ids is not None
  120. model_input.last_sampled_token_ids = (
  121. execute_model_req.last_sampled_token_ids.cuda()
  122. )
  123. model_input.add_sampler_output(
  124. SamplerOutput(outputs=[], sampled_token_ids=None),
  125. model_input.last_sampled_token_ids,
  126. )
  127. # free sampled token ids from the previous step.
  128. # TODO: we could reuse the sampled token ids tensor from
  129. # the previous step instead.
  130. for output in model_input.cached_outputs[:-1]:
  131. output.sampled_token_ids = None
  132. assert model_input.cached_outputs[-1].sampled_token_ids is not None
  133. def prepare_input(
  134. self,
  135. execute_model_req: Optional[ExecuteModelRequest] = None,
  136. ) -> Optional[Tuple[StatefulModelInput, WorkerInput, Dict[str,
  137. torch.Tensor]]]:
  138. """
  139. Depending on the current state of the request and multi step worker,
  140. this method may skip the normal _prepare_model_input and
  141. _prepare_worker_input methods and instead used cached values.
  142. """
  143. if self.is_driver_worker:
  144. if execute_model_req is None:
  145. if self.do_metadata_broadcast:
  146. # This signals that there's no more requests to process for
  147. # now. All workers are running infinite loop with
  148. # broadcast_tensor_dict, and it stops the loop when the
  149. # driver broadcasts an empty input. Send an empty input to
  150. # notify all other workers to stop their execution loop.
  151. broadcast_tensor_dict({}, src=0)
  152. return None
  153. virtual_engine = execute_model_req.virtual_engine
  154. (model_input, worker_input,
  155. kwargs) = self._get_driver_input_and_broadcast(execute_model_req)
  156. assert isinstance(model_input, StatefulModelInput)
  157. if execute_model_req.is_first_multi_step:
  158. # cache the worker input and model input for the next steps
  159. self.multi_step_states[virtual_engine] = MultiStepState(
  160. worker_input=worker_input, model_input=model_input
  161. )
  162. # if TP workers
  163. else:
  164. broadcast_data = self._get_worker_input_from_broadcast()
  165. # if the driver has sent an empty input, we should stop the worker
  166. # loop
  167. if broadcast_data is None:
  168. return None
  169. model_input, worker_input, kwargs = broadcast_data
  170. assert isinstance(model_input, StatefulModelInput)
  171. virtual_engine = worker_input.virtual_engine
  172. if model_input.is_first_multi_step:
  173. pass
  174. # TODO: Can cache the worker input and model input for the
  175. # next steps. See below for details
  176. else:
  177. # TODO: possible to also cache and reuse the cached worker
  178. # input and model input. The idea is essentially the delta
  179. # optimization for model_inputs. Where the TP workers can cache
  180. # the model input states and we only broadcast the delta need
  181. # for the next step (sampled_token_ids from the previous step)
  182. assert isinstance(model_input, StatefulModelInput)
  183. # we need to update the last sampled token ids in the model
  184. # input for the workers so that they can run inplace
  185. # advance_step
  186. model_input.add_sampler_output(
  187. SamplerOutput(outputs=[], sampled_token_ids=None),
  188. model_input.last_sampled_token_ids,
  189. )
  190. assert model_input is not None
  191. assert worker_input is not None
  192. return model_input, worker_input, kwargs