draft_model_runner.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from typing import List, Optional
  2. import torch
  3. from aphrodite.common.config import (CacheConfig, DeviceConfig, LoadConfig,
  4. LoRAConfig, ModelConfig, MultiModalConfig,
  5. ParallelConfig, PromptAdapterConfig,
  6. SchedulerConfig)
  7. from aphrodite.common.sequence import (IntermediateTensors, SamplerOutput,
  8. SequenceGroupMetadata)
  9. from aphrodite.task_handler.model_runner import (
  10. ModelInputForGPUWithSamplingMetadata, ModelRunner)
  11. class TP1DraftModelRunner(ModelRunner):
  12. """Specialized model runner for speculative decoding draft model.
  13. Since the draft model always execute k forward passes consecutively to
  14. generate k speculative tokens in a single speculative decoding step,
  15. we could get rid of most CPU-GPU synchronization and data transfer
  16. overheads by keeping model input and output tensors on GPU all the time.
  17. This runner is still under development so there's no performance gain
  18. at this moment. Currently we adopt a temporary solution that caches the
  19. seq_group_metadata_list for multi-step execution, so that we can
  20. leverage existing prepare_model_input to be compatible with the current
  21. execution flow, but we plan to remove this cache and avoid calling
  22. prepare_model_input in execute_model at all.
  23. The detail development plan includes:
  24. 1. Use "update_model_input" to update existing model_input without
  25. creating a new one.
  26. 2. Improve the performance of "update_model_input" with a GPU kernel.
  27. 3. Support TP > 1 (this requires some designs because we do not expect
  28. any broadcasting inside execute_model).
  29. """
  30. def __init__(
  31. self,
  32. model_config: ModelConfig,
  33. parallel_config: ParallelConfig,
  34. scheduler_config: SchedulerConfig,
  35. device_config: DeviceConfig,
  36. cache_config: CacheConfig,
  37. load_config: LoadConfig,
  38. lora_config: Optional[LoRAConfig],
  39. kv_cache_dtype: Optional[str] = "auto",
  40. is_driver_worker: bool = False,
  41. multimodal_config: Optional[MultiModalConfig] = None,
  42. prompt_adapter_config: Optional[PromptAdapterConfig] = None,
  43. return_hidden_states: bool = False,
  44. ):
  45. if return_hidden_states:
  46. raise ValueError(
  47. "return_hidden_states is not supported for TP1DraftModelRunner."
  48. )
  49. super().__init__(
  50. model_config=model_config,
  51. parallel_config=parallel_config,
  52. scheduler_config=scheduler_config,
  53. device_config=device_config,
  54. cache_config=cache_config,
  55. load_config=load_config,
  56. lora_config=lora_config,
  57. kv_cache_dtype=kv_cache_dtype,
  58. is_driver_worker=is_driver_worker,
  59. multimodal_config=multimodal_config,
  60. prompt_adapter_config=prompt_adapter_config,
  61. return_hidden_states=return_hidden_states,
  62. )
  63. # TODO: Remove this cache when we are able to update model_input
  64. # directly in advance_step.
  65. self.cached_seq_group_metadata_list: Optional[
  66. List[SequenceGroupMetadata]] = None
  67. def prepare_model_input(
  68. self,
  69. seq_group_metadata_list: List[SequenceGroupMetadata],
  70. virtual_engine: int = 0,
  71. finished_requests_ids: Optional[List[str]] = None
  72. ) -> ModelInputForGPUWithSamplingMetadata:
  73. """A temporary solution that caches the seq_group_metadata_list
  74. for multi-step execution.
  75. TODO: In-place update model_input and remove this function.
  76. """
  77. self.cached_seq_group_metadata_list = seq_group_metadata_list
  78. return super().prepare_model_input(
  79. seq_group_metadata_list,
  80. finished_requests_ids=finished_requests_ids)
  81. def update_model_input(
  82. self, model_input: ModelInputForGPUWithSamplingMetadata,
  83. last_output: SamplerOutput
  84. ) -> ModelInputForGPUWithSamplingMetadata:
  85. """Prepare the model inputs for the next step.
  86. TODO: In-place update model_input instead of calling
  87. prepare_model_input.
  88. """
  89. # Append the output token to the sequence data.
  90. assert self.cached_seq_group_metadata_list is not None
  91. for seq_group_metadata, sequence_group_outputs in zip(
  92. self.cached_seq_group_metadata_list, last_output.outputs):
  93. seq_group_metadata.is_prompt = False
  94. for seq_output in sequence_group_outputs.samples:
  95. seq = seq_group_metadata.seq_data[seq_output.parent_seq_id]
  96. token_id = seq_output.output_token
  97. token_logprob = seq_output.logprobs[token_id]
  98. seq.append_token_id(token_id, token_logprob.logprob)
  99. seq.update_num_computed_tokens(1)
  100. return self.prepare_model_input(self.cached_seq_group_metadata_list)
  101. @torch.inference_mode()
  102. def execute_model(
  103. self,
  104. model_input: ModelInputForGPUWithSamplingMetadata,
  105. kv_caches: List[torch.Tensor],
  106. intermediate_tensors: Optional[IntermediateTensors] = None,
  107. num_steps: int = 1,
  108. ) -> Optional[List[SamplerOutput]]:
  109. # Since we do not broadcast data inside execute_model anymore,
  110. # we need to figure out the best way to support TP > 1 in this
  111. # case, because we will at least need to broadcast the sampled
  112. # tokens to all workers.
  113. if not self.is_driver_worker:
  114. raise ValueError("TP1DraftModelRunner only supports TP=1.")
  115. if self.lora_config:
  116. assert model_input.lora_requests is not None
  117. assert model_input.lora_mapping is not None
  118. self.set_active_loras(model_input.lora_requests,
  119. model_input.lora_mapping)
  120. if self.prompt_adapter_config:
  121. assert model_input.prompt_adapter_requests is not None
  122. assert model_input.prompt_adapter_mapping is not None
  123. self.set_active_prompt_adapters(
  124. model_input.prompt_adapter_requests,
  125. model_input.prompt_adapter_mapping)
  126. virtual_engine = model_input.virtual_engine
  127. outputs: List[SamplerOutput] = []
  128. for step in range(num_steps):
  129. # Currently cuda graph is only supported by the decode phase.
  130. assert model_input.attn_metadata is not None
  131. prefill_meta = model_input.attn_metadata.prefill_metadata
  132. decode_meta = model_input.attn_metadata.decode_metadata
  133. if prefill_meta is None and decode_meta.use_cuda_graph:
  134. assert model_input.input_tokens is not None
  135. graph_batch_size = model_input.input_tokens.shape[0]
  136. model_executable = (
  137. self.graph_runners[virtual_engine][graph_batch_size])
  138. else:
  139. model_executable = self.model
  140. multi_modal_kwargs = model_input.multi_modal_kwargs or {}
  141. hidden_states = model_executable(
  142. input_ids=model_input.input_tokens,
  143. positions=model_input.input_positions,
  144. kv_caches=kv_caches,
  145. attn_metadata=model_input.attn_metadata,
  146. intermediate_tensors=intermediate_tensors,
  147. **multi_modal_kwargs,
  148. )
  149. # Compute the logits.
  150. logits = self.model.compute_logits(hidden_states,
  151. model_input.sampling_metadata)
  152. # Sample the next token.
  153. outputs.append(
  154. self.model.sample(
  155. logits=logits,
  156. sampling_metadata=model_input.sampling_metadata,
  157. ))
  158. # Prepare the inputs for the next step.
  159. if step != num_steps - 1:
  160. model_input = self.update_model_input(model_input, outputs[-1])
  161. return outputs