batch_expansion.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. from array import array
  2. from itertools import chain, count
  3. from typing import Iterator, List, Optional, Tuple
  4. import torch
  5. from aphrodite import SamplingParams
  6. from aphrodite.common.sequence import (APHRODITE_TOKEN_ID_ARRAY_TYPE,
  7. ExecuteModelRequest, SequenceData,
  8. SequenceGroupMetadata, get_all_seq_ids)
  9. from aphrodite.modeling.layers.sampler import SamplerOutput
  10. from aphrodite.spec_decode.interfaces import (SpeculativeProposals,
  11. SpeculativeScorer,
  12. SpeculativeScores)
  13. from aphrodite.spec_decode.util import nvtx_range, split_batch_by_proposal_len
  14. from aphrodite.task_handler.worker_base import WorkerBase
  15. SeqId = int
  16. TargetSeqId = int
  17. TokenId = int
  18. DEFAULT_SIMPLE_SAMPLING_PARAMS = SamplingParams()
  19. class BatchExpansionTop1Scorer(SpeculativeScorer):
  20. """Implements a speculative scorer that uses batch expansion to get
  21. probabilities of speculative tokens according to the scoring model.
  22. Batch expansion converts a list of sequences and multiple query positions
  23. to a new batch of sequences, each with a single query position. This allows
  24. for MQA-like scoring in speculative decoding without requiring an MQA
  25. kernel.
  26. It is strictly less efficient than MQA scoring.
  27. It only supports scoring the top1 proposal tokens of the proposer, instead
  28. of topk/tree.
  29. """
  30. def __init__(self, scorer_worker: WorkerBase, device: str,
  31. vocab_size: int):
  32. self._scorer_worker = scorer_worker
  33. self._device = device
  34. self._vocab_size = vocab_size
  35. @nvtx_range("BatchExpansionTop1Scorer.score_proposals")
  36. def score_proposals(
  37. self,
  38. execute_model_req: ExecuteModelRequest,
  39. proposals: SpeculativeProposals,
  40. ) -> SpeculativeScores:
  41. """Score the proposed tokens via the scorer model.
  42. This converts each input sequence to a set of k+1 target sequences. The
  43. target sequences have the unique continuations to be scored and a
  44. unique sequence ID that is different from all input sequence ids.
  45. If a speculative sequence length would exceed the max model length, then
  46. no speculation is produced for that sequence.
  47. Args:
  48. execute_model_req: The execution request.
  49. proposals: The speculative proposals to score.
  50. Returns:
  51. SpeculativeScores: The scores of each speculative token, along with
  52. which sequences were ignored during scoring.
  53. """
  54. # TODO: perform this on GPU to remove blocking call.
  55. proposal_lens_list = proposals.proposal_lens.tolist()
  56. proposal_token_ids_list = proposals.proposal_token_ids.tolist()
  57. # Filter the list to ignore -1 proposals.
  58. proposal_token_ids_list_without_skips = [
  59. proposals for proposals in proposal_token_ids_list
  60. if -1 not in proposals
  61. ]
  62. (spec_indices, non_spec_indices, target_seq_group_metadata_list,
  63. num_scoring_tokens) = self._expand_batch(
  64. seq_group_metadata_list=execute_model_req.seq_group_metadata_list,
  65. proposal_token_ids_list=proposal_token_ids_list_without_skips,
  66. proposal_lens_list=proposal_lens_list,
  67. )
  68. target_sampler_output = self._scorer_worker.execute_model(
  69. execute_model_req=execute_model_req.clone(
  70. seq_group_metadata_list=target_seq_group_metadata_list))
  71. assert len(target_sampler_output) == 1, "expected single-step output"
  72. target_sampler_output = target_sampler_output[0]
  73. if not non_spec_indices:
  74. # All sequence groups in batch have spec decoding enabled
  75. contracted = self._contract_batch_all_spec(
  76. target_sampler_output=target_sampler_output,
  77. proposals=proposals,
  78. )
  79. else:
  80. # Batch has a mix of spec decode enabled and disabled seq groups
  81. contracted = self._contract_batch(
  82. contracted_bs=len(execute_model_req.seq_group_metadata_list),
  83. target_sampler_output=target_sampler_output,
  84. proposals=proposals,
  85. num_scoring_tokens=num_scoring_tokens,
  86. non_spec_indices=non_spec_indices,
  87. spec_indices=spec_indices,
  88. k=execute_model_req.num_lookahead_slots,
  89. )
  90. all_tokens, all_probs, spec_logprobs, all_hidden_states = contracted
  91. return SpeculativeScores(
  92. probs=all_probs,
  93. token_ids=all_tokens,
  94. logprobs=spec_logprobs,
  95. hidden_states=all_hidden_states,
  96. )
  97. def _expand_batch(
  98. self,
  99. seq_group_metadata_list: List[SequenceGroupMetadata],
  100. proposal_token_ids_list: List[List[TokenId]],
  101. proposal_lens_list: List[int],
  102. ) -> Tuple[List[int], List[int], List[SequenceGroupMetadata], int]:
  103. """Given the input sequences and potentially multiple corresponding
  104. proposal tokens, create a new batch where each sequence has a single
  105. query token.
  106. """
  107. # Aphrodite currently only supports proposal lens equal to zero or the
  108. # batch proposal len. This adds some complexity (splitting the batch
  109. # into spec and non spec sequences) and should be removed in the
  110. # future. It can be done by supporting per-sequence proposal lens.
  111. (spec_seqs, spec_indices), (non_spec_seqs, non_spec_indices) = \
  112. split_batch_by_proposal_len(
  113. seq_group_metadata_list, proposal_lens_list)
  114. target_seq_group_metadata_list = self._create_scoring_model_input(
  115. seq_group_metadata_list=spec_seqs,
  116. proposal_token_ids=proposal_token_ids_list,
  117. # NOTE: We determine the seq ids in the expanded batch using the
  118. # full seq_group_metadata_list, instead of only spec_seqs.
  119. target_seq_ids_iter=self._create_target_seq_id_iterator(
  120. seq_ids=get_all_seq_ids(seq_group_metadata_list)),
  121. )
  122. num_scoring_tokens = len(target_seq_group_metadata_list)
  123. target_seq_group_metadata_list.extend(non_spec_seqs)
  124. return (spec_indices, non_spec_indices, target_seq_group_metadata_list,
  125. num_scoring_tokens)
  126. def _contract_batch(
  127. self, contracted_bs: int, target_sampler_output: SamplerOutput,
  128. proposals: SpeculativeProposals, num_scoring_tokens: int,
  129. non_spec_indices: List[int], spec_indices: List[int], k: int
  130. ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor,
  131. Optional[torch.Tensor]]:
  132. """Contract the expanded batch back into its original size.
  133. This maps the scores of speculative tokens back to their original
  134. sequences.
  135. contracted_bs is the original batch size, and the batch size that the
  136. target_sampler_output will be contracted to.
  137. """
  138. (target_token_ids, target_probs, target_logprobs, target_hidden_states,
  139. non_spec_target_token_ids, non_spec_target_probs,
  140. non_spec_target_logprobs,
  141. non_spec_target_hidden_states) = self._split_scoring_output(
  142. target_sampler_output, num_scoring_tokens)
  143. # Map distinct sequences used to score each token
  144. # of shape [batch_size * k + 1] back to [batch_size, k + 1].
  145. expanded_batch_size, k = proposals.proposal_token_ids.shape
  146. # The number of tokens in the expanded batch used for speculation is
  147. # equal to the total expanded batch size minus the number of samples for
  148. # non-speculative sequences.
  149. non_spec_expanded_bs = len(non_spec_target_token_ids)
  150. spec_expanded_bs = expanded_batch_size - non_spec_expanded_bs
  151. target_token_ids = target_token_ids.reshape(spec_expanded_bs, k + 1)
  152. target_probs = target_probs.reshape(*target_token_ids.shape,
  153. self._vocab_size)
  154. target_logprobs = target_logprobs.reshape(target_probs.shape)
  155. if target_hidden_states is not None:
  156. target_hidden_states = target_hidden_states.reshape(
  157. *target_token_ids.shape, target_hidden_states.shape[-1])
  158. all_tokens = target_token_ids.new_full(size=(contracted_bs, k + 1),
  159. fill_value=-1)
  160. all_probs = target_probs.new_zeros(*all_tokens.shape, self._vocab_size)
  161. all_logprobs = target_logprobs.new_full(size=all_probs.shape,
  162. fill_value=-float("inf"))
  163. if target_sampler_output.hidden_states is not None:
  164. all_hidden_states = target_hidden_states.new_zeros(
  165. size=(contracted_bs, k + 1, target_hidden_states.shape[-1]))
  166. else:
  167. all_hidden_states = None
  168. if non_spec_indices:
  169. all_tokens[non_spec_indices, :1] = \
  170. non_spec_target_token_ids.unsqueeze(1)
  171. all_probs[non_spec_indices, :1, :] = \
  172. non_spec_target_probs.unsqueeze(1)
  173. all_logprobs[non_spec_indices, :1, :] = \
  174. non_spec_target_logprobs.unsqueeze(1)
  175. if all_hidden_states is not None:
  176. assert non_spec_target_hidden_states is not None
  177. all_hidden_states[non_spec_indices, :1, :] = \
  178. non_spec_target_hidden_states.unsqueeze(1)
  179. if spec_indices:
  180. all_tokens[spec_indices] = target_token_ids
  181. all_probs[spec_indices] = target_probs
  182. all_logprobs[spec_indices] = target_logprobs
  183. if all_hidden_states is not None:
  184. all_hidden_states[spec_indices] = target_hidden_states
  185. return all_tokens, all_probs, all_logprobs, all_hidden_states
  186. def _contract_batch_all_spec(
  187. self,
  188. target_sampler_output: SamplerOutput,
  189. proposals: SpeculativeProposals,
  190. ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor,
  191. Optional[torch.Tensor]]:
  192. """Contract the expanded batch back into its original size.
  193. This maps the scores of speculative tokens back to their original
  194. sequences.
  195. It assumes all sequences in the batch were previously expanded.
  196. """
  197. # Map distinct sequences used to score each token
  198. # of shape [batch_size * k + 1] back to [batch_size, k + 1].
  199. contracted_bs, k = proposals.proposal_token_ids.shape
  200. # Reshape tensors to original batch size
  201. target_token_ids = target_sampler_output.sampled_token_ids.reshape(
  202. contracted_bs, k + 1)
  203. target_probs = target_sampler_output.sampled_token_probs.reshape(
  204. *target_token_ids.shape, self._vocab_size)
  205. target_logprobs = target_sampler_output.logprobs.reshape(
  206. target_probs.shape)
  207. target_hidden_states = target_sampler_output.hidden_states
  208. if target_hidden_states is not None:
  209. target_hidden_states = target_hidden_states.reshape(
  210. *target_token_ids.shape, target_hidden_states.shape[-1])
  211. return (target_token_ids, target_probs, target_logprobs,
  212. target_hidden_states)
  213. def _create_scoring_model_input(
  214. self,
  215. seq_group_metadata_list: List[SequenceGroupMetadata],
  216. proposal_token_ids: List[List[TokenId]], # shape: [batch_size, k]
  217. target_seq_ids_iter: Iterator[TargetSeqId],
  218. ) -> List[SequenceGroupMetadata]:
  219. """Given the original input sequences and proposed tokens from the draft
  220. model, create a list of target sequences that can be used for scoring.
  221. target_seq_ids_iter provides sequence ids for the expanded batch,
  222. fulfilling the requirement that no seq id in the expanded batch is equal
  223. to the seq id in the original batch.
  224. """
  225. if not seq_group_metadata_list:
  226. return []
  227. target_seq_group_metadata = list(
  228. chain.from_iterable(
  229. self._create_target_seq_group_metadata(
  230. seq_group_metadata,
  231. proposal_token_ids,
  232. i,
  233. target_seq_ids_iter,
  234. ) for i, seq_group_metadata in enumerate(
  235. seq_group_metadata_list)))
  236. return target_seq_group_metadata
  237. def _create_target_seq_group_metadata(
  238. self,
  239. input_seq_group_metadata: SequenceGroupMetadata,
  240. proposal_token_ids: List[List[TokenId]], # shape: [batch_size, k]
  241. batch_index: int,
  242. target_seq_ids_iter: Iterator[TargetSeqId],
  243. ) -> List[SequenceGroupMetadata]:
  244. """Given an input sequence group metadata and a list of draft tokens,
  245. create a list of target SequenceGroupMetadata, one for each
  246. token id that needs to be scored.
  247. Naive speculative decoding requires K target model scores, one for each
  248. draft model token. However one can add a bonus token such that if each
  249. token is accepted, then a final token may be sampled from the model.
  250. This function creates K+1 target SequenceGroupMetadata to take
  251. advantage of the bonus token.
  252. """
  253. assert not input_seq_group_metadata.is_prompt, (
  254. "Speculating on "
  255. "prompts not yet supported")
  256. assert len(input_seq_group_metadata.seq_data) == 1, (
  257. "Beam search "
  258. "not supported in speculative decoding")
  259. input_seq_id = next(iter(input_seq_group_metadata.seq_data.keys()))
  260. token_ids_to_score = self._get_token_ids_to_score(
  261. proposal_token_ids[batch_index])
  262. # Use simpler sampling parameters apart from for final token
  263. # (in particular don't do seeded sampling) since those sampled tokens
  264. # aren't used.
  265. # We don't replace the sampling_params in the greedy case because
  266. # this also controls whether the probs get modified in the sampler
  267. # (see use of _modify_greedy_probs_inplace there).
  268. sampling_params = input_seq_group_metadata.sampling_params
  269. non_bonus_sampling_params = DEFAULT_SIMPLE_SAMPLING_PARAMS \
  270. if sampling_params.temperature else sampling_params
  271. target_seq_group_metadata_list: List[SequenceGroupMetadata] = []
  272. last_index = len(token_ids_to_score) - 1
  273. for i, token_ids in enumerate(token_ids_to_score):
  274. target_sampling_params = sampling_params if i == last_index \
  275. else non_bonus_sampling_params
  276. target_seq_group_metadata_list.append(
  277. self._create_single_target_seq_group_metadata(
  278. input_seq_group_metadata,
  279. input_seq_id,
  280. next(target_seq_ids_iter),
  281. token_ids,
  282. sampling_params=target_sampling_params,
  283. ))
  284. return target_seq_group_metadata_list
  285. @staticmethod
  286. def _create_single_target_seq_group_metadata(
  287. seq_group_metadata: SequenceGroupMetadata,
  288. seq_id: SeqId,
  289. target_seq_id: TargetSeqId,
  290. token_ids: List[TokenId],
  291. sampling_params: SamplingParams,
  292. ) -> SequenceGroupMetadata:
  293. """Create a single target SequenceGroupMetadata.
  294. Args:
  295. seq_group_metadata: The metadata for the input sequence.
  296. seq_id: The input sequence ID.
  297. target_seq_id: The corresponding target sequence ID.
  298. token_ids: The list of token ids that are to be appended to the
  299. input sequence.
  300. """
  301. seq_data = seq_group_metadata.seq_data[seq_id]
  302. prompt_token_ids = seq_data.prompt_token_ids_array
  303. new_output_token_ids = [*seq_data.get_output_token_ids(), *token_ids]
  304. new_seq_data_dict = {
  305. target_seq_id:
  306. SequenceData(
  307. prompt_token_ids,
  308. _output_token_ids=array(APHRODITE_TOKEN_ID_ARRAY_TYPE,
  309. new_output_token_ids),
  310. ),
  311. }
  312. # This is a hack. Technically, spec decoding should compute
  313. # num_lookahead slots at one shot, but instead, it expands the batch
  314. # and evaluate one by one right now. context_len is seq_len - 1 because
  315. # the kv cache is filled by a previous batch in the batch expansion.
  316. for data in new_seq_data_dict.values():
  317. data.update_num_computed_tokens(data.get_len() - 1)
  318. return SequenceGroupMetadata(
  319. request_id=seq_group_metadata.request_id,
  320. is_prompt=seq_group_metadata.is_prompt,
  321. seq_data=new_seq_data_dict,
  322. sampling_params=sampling_params,
  323. block_tables={
  324. target_seq_id: seq_group_metadata.block_tables[seq_id],
  325. },
  326. lora_request=None,
  327. token_chunk_size=1,
  328. )
  329. @staticmethod
  330. def _split_scoring_output(
  331. sampler_output: SamplerOutput, num_scoring_tokens: int
  332. ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor,
  333. Optional[torch.Tensor], torch.Tensor, torch.Tensor,
  334. torch.Tensor, Optional[torch.Tensor]]:
  335. """Split the target model output into speculative and non-speculative
  336. output.
  337. """
  338. # Aphrodite currently only supports proposal lens equal to zero or the
  339. # batch proposal len. This adds some complexity (splitting the batch
  340. # into spec and non spec sequences) and should be removed in the
  341. # future. It can be done by supporting per-sequence proposal lens.
  342. # First samples are from speculative scoring, latter samples are non-
  343. # speculative samples.
  344. split_sizes = (num_scoring_tokens,
  345. sampler_output.sampled_token_ids.numel() -
  346. num_scoring_tokens)
  347. (spec_probs, non_spec_probs
  348. ) = sampler_output.sampled_token_probs.split(split_sizes)
  349. (spec_sampled_tokens, non_spec_sampled_tokens
  350. ) = sampler_output.sampled_token_ids.flatten().split(split_sizes)
  351. (
  352. spec_logprobs,
  353. non_spec_logprobs,
  354. ) = sampler_output.logprobs.split(split_sizes)
  355. if sampler_output.hidden_states is not None:
  356. (
  357. spec_hidden_states,
  358. non_spec_hidden_states,
  359. ) = sampler_output.hidden_states.split(split_sizes)
  360. else:
  361. spec_hidden_states, non_spec_hidden_states = None, None
  362. return (spec_sampled_tokens, spec_probs, spec_logprobs,
  363. spec_hidden_states, non_spec_sampled_tokens, non_spec_probs,
  364. non_spec_logprobs, non_spec_hidden_states)
  365. @staticmethod
  366. def _create_target_seq_id_iterator(
  367. seq_ids: List[SeqId]) -> Iterator[TargetSeqId]:
  368. """Create an iterator for creating target sequence ids.
  369. Target sequence ids are distinct from sequence ids because we create a
  370. distinct target sequence id for each proposal token to be scored.
  371. This implementation increments a counter starting at 1 + max of all
  372. provided input sequence ids.
  373. """
  374. return count(start=max(seq_ids) + 1)
  375. @staticmethod
  376. def _get_token_ids_to_score(
  377. full_spec_token_ids: List[TokenId] # shape: [k]
  378. ) -> List[List[TokenId]]:
  379. """Given an int tensor of proposal token ids, return a list of
  380. token ids that should be scored.
  381. Returns k+1 output lists. The additional one is used for generating the
  382. bonus token.
  383. Example:
  384. Input: [0, 1, 2, 3] (k=4)
  385. Output: (k+1 lists)
  386. []
  387. [0]
  388. [0, 1]
  389. [0, 1, 2]
  390. [0, 1, 2, 3]
  391. """
  392. empty_token_ids: List[TokenId] = []
  393. token_ids_to_score = [empty_token_ids]
  394. token_ids_to_score.extend(full_spec_token_ids[:i + 1]
  395. for i in range(len(full_spec_token_ids)))
  396. return token_ids_to_score