123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- import copy
- from typing import Dict, List, Tuple
- import torch
- from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
- from aphrodite.spec_decode.interfaces import SpeculativeProposals
- from aphrodite.spec_decode.top1_proposer import Top1Proposer
- from aphrodite.task_handler.worker import Worker
- class MultiStepWorker(Worker):
- """The MultiStepWorker is equivalent to a Worker except that it allows
- multiple forward passes in a single call, assuming the scheduler has
- allocated enough space to store the additional KV. This reduces overhead
- by invoking the scheduler less.
- The MultiStepWorker does not support cache swap operations, or beam search.
- Cache swap operations do not require large modifications. On the other hand,
- beam search requires memory allocations during sequence forks and thus
- requires more thought for MultiStepWorker support.
- """
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Lazy initialization list.
- self._proposer: Top1Proposer
- def init_device(self):
- super().init_device()
- self._proposer = Top1Proposer(
- self,
- self.device,
- self.vocab_size,
- max_proposal_len=self.max_model_len,
- )
- def set_include_gpu_probs_tensor(self):
- # Need include_gpu_probs_tensor for multi_step_worker
- self.model_runner.model.sampler.include_gpu_probs_tensor = True
- @torch.inference_mode()
- def sampler_output(
- self,
- seq_group_metadata_list: List[SequenceGroupMetadata],
- blocks_to_swap_in: Dict[int, int],
- blocks_to_swap_out: Dict[int, int],
- blocks_to_copy: Dict[int, List[int]],
- sample_len: int,
- ) -> Tuple[List[SamplerOutput], bool]:
- """Run the model forward pass sample_len times. Returns the list of
- sampler output, one per model forward pass.
- """
- self._raise_if_unsupported(seq_group_metadata_list, blocks_to_swap_in,
- blocks_to_swap_out, blocks_to_copy)
- # Shallow copy input data so modifications (such as appending tokens)
- # do not cause side-effects.
- copied_seq_group_metadata_list = self._shallow_copy_inputs(
- seq_group_metadata_list)
- # Assert enough KV space for sample_len tokens per sequence.
- self._assert_enough_kv_space(seq_group_metadata_list, sample_len)
- # Run model sample_len times.
- model_outputs = []
- for _ in range(sample_len):
- model_output = super().execute_model(
- seq_group_metadata_list=copied_seq_group_metadata_list,
- blocks_to_swap_in=blocks_to_swap_in,
- blocks_to_swap_out=blocks_to_swap_out,
- blocks_to_copy=blocks_to_copy,
- )
- assert (len(model_output) == 1
- ), "composing multistep workers not supported"
- model_output = model_output[0]
- self._append_new_tokens(model_output,
- copied_seq_group_metadata_list)
- model_outputs.append(model_output)
- return model_outputs, True
- def get_spec_proposals(
- self,
- seq_group_metadata_list: List[SequenceGroupMetadata],
- blocks_to_swap_in: Dict[int, int],
- blocks_to_swap_out: Dict[int, int],
- blocks_to_copy: Dict[int, List[int]],
- max_proposal_len: int,
- ) -> SpeculativeProposals:
- """Produce speculations given an input batch of sequences. The number of
- speculative tokens per sequence is determined by max_proposal_len.
- """
- return self._proposer.get_proposals(
- seq_group_metadata_list,
- blocks_to_swap_in,
- blocks_to_swap_out,
- blocks_to_copy,
- max_proposal_len,
- )
- def _append_new_tokens(
- self, model_output: SamplerOutput,
- seq_group_metadata_list: SequenceGroupMetadata) -> None:
- """Given model output from a single run, append the tokens to the
- sequences. This is normally done outside of the worker, but it is
- required if the worker is to perform multiple forward passes.
- """
- for seq_group_metadata, sequence_group_outputs in zip(
- seq_group_metadata_list, model_output):
- seq_group_metadata.is_prompt = False
- for seq_output in sequence_group_outputs.samples:
- # NOTE: Beam search is not supported, so we can assume that
- # parent_seq_id == seq_id.
- seq = seq_group_metadata.seq_data[seq_output.parent_seq_id]
- token_id = seq_output.output_token
- token_logprob = seq_output.logprobs[token_id]
- seq.append_token_id(token_id, token_logprob.logprob)
- def _shallow_copy_inputs(
- self, seq_group_metadata_list: List[SequenceGroupMetadata]
- ) -> List[SequenceGroupMetadata]:
- """Copy input data structures to remove side-effects when input data
- structures are shared with other modules.
- Helpful when the Aphrodite scheduler runs in the same process as the
- worker. The alternative is deep-copying (or other form of deep copy);
- this has performance downsides.
- """
- # Shallow-copy the list of SequenceGroupMetadata. This allows us to
- # append tokens and change is_prompt without external side-effects.
- new_seq_group_metadata_list = []
- for old_seq_group_metadata in seq_group_metadata_list:
- # We must shallow-copy seq_group_metadata as is_prompt could change.
- seq_group_metadata = copy.copy(old_seq_group_metadata)
- new_seq_group_metadata_list.append(seq_group_metadata)
- # We must shallow-copy seq_data as we will append token ids
- new_seq_data = {}
- for seq_id, old_seq_data in seq_group_metadata.seq_data.items():
- new_seq_data[seq_id] = copy.copy(old_seq_data)
- new_seq_data[
- seq_id].output_token_ids = old_seq_data.output_token_ids[:]
- seq_group_metadata.seq_data = new_seq_data
- return new_seq_group_metadata_list
- def _assert_enough_kv_space(
- self, seq_group_metadata_list: List[SequenceGroupMetadata],
- num_steps: int) -> None:
- """Assert there are enough physical blocks per sequence to store the
- current KV plus additional KV from num_steps tokens.
- """
- assert self.model_runner.block_size is not None
- for seq_group_metadata in seq_group_metadata_list:
- # Only one seq_id is guaranteed because there is no beam search.
- seq_id = list(seq_group_metadata.seq_data.keys())[0]
- seq = seq_group_metadata.seq_data[seq_id]
- # After num_steps, the seq len will be the current seq len
- # plus one token per step.
- final_seq_len = seq.get_len() + num_steps
- # We will have final_seq_len - 1 KV because Aphrodite saves KV for
- # a token in the iteration after the token was generated.
- required_num_kv_slots = final_seq_len - 1
- # The allocated number of kv slots is the number of allocated blocks
- # times the number of slots of block.
- number_physical_blocks = len(
- seq_group_metadata.block_tables[seq_id])
- allocated_kv_slots = (number_physical_blocks *
- self.model_runner.block_size)
- if required_num_kv_slots > allocated_kv_slots:
- request_id = seq_group_metadata.request_id
- raise ValueError(
- "The worker attempted to run "
- f"{num_steps} times but found insufficient KV space for "
- f"{request_id=} {seq_id=}. ({allocated_kv_slots=} "
- f"{required_num_kv_slots=}).")
- def _raise_if_unsupported(
- self,
- seq_group_metadata_list: List[SequenceGroupMetadata],
- blocks_to_swap_in: Dict[int, int],
- blocks_to_swap_out: Dict[int, int],
- blocks_to_copy: Dict[int, List[int]],
- ) -> None:
- """MultiStepWorker does not yet implement support for cache swap
- operations or beam search.
- """
- if any([blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy]):
- raise NotImplementedError(
- "MultiStepWorker does not support cache operations")
- if any(
- len(seq_group_metadata.seq_data.keys()) != 1
- for seq_group_metadata in seq_group_metadata_list):
- raise NotImplementedError(
- "MultiStepWorker does not support beam search.")
|