123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- import copy
- import weakref
- from typing import Dict, List, Set, Tuple
- import torch
- from aphrodite.common.sequence import (ExecuteModelRequest, HiddenStates,
- SequenceData, SequenceGroupMetadata)
- from aphrodite.modeling.layers.sampler import SamplerOutput
- from aphrodite.spec_decode.draft_model_runner import TP1DraftModelRunner
- from aphrodite.spec_decode.interfaces import (SpeculativeProposals,
- SpeculativeProposer)
- from aphrodite.spec_decode.proposer_worker_base import ProposerWorkerBase
- from aphrodite.spec_decode.top1_proposer import Top1Proposer
- from aphrodite.worker.worker import Worker
- class MultiStepWorker(Worker, ProposerWorkerBase):
- """The MultiStepWorker is equivalent to a Worker except that it allows
- multiple forward passes in a single call, assuming the scheduler has
- allocated enough space to store the additional KV. This reduces overhead
- by invoking the scheduler less.
- The MultiStepWorker does not support cache swap operations, or beam search.
- Cache swap operations do not require large modifications. On the other hand,
- beam search requires memory allocations during sequence forks and thus
- requires more thought for MultiStepWorker support.
- """
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Lazy initialization list.
- self._proposer: SpeculativeProposer
- def init_device(self) -> None:
- super().init_device()
- self._proposer = Top1Proposer(
- weakref.proxy(self), # type: ignore[arg-type]
- self.device,
- self.vocab_size,
- max_proposal_len=self.max_model_len,
- )
- def set_include_gpu_probs_tensor(self) -> None:
- # Need include_gpu_probs_tensor for MultiStepWorker
- self.model_runner.model.sampler.include_gpu_probs_tensor = True
- def set_should_modify_greedy_probs_inplace(self) -> None:
- self.model_runner.model.sampler.should_modify_greedy_probs_inplace = (
- True)
- @torch.inference_mode()
- def sampler_output(
- self,
- execute_model_req: ExecuteModelRequest,
- sample_len: int,
- seq_ids_with_bonus_token_in_last_step: Set[int],
- ) -> Tuple[List[SamplerOutput], bool]:
- """Run the model forward pass sample_len times. Returns the list of
- sampler output, one per model forward pass, along with indicator of
- whether torch tensor in sampler output need to be transposed in latter
- sampler_output_to_torch logic.
- For multi step worker, this indicator shall be True.
- """
- self._raise_if_unsupported(execute_model_req)
- # Expand the batch for sequences with a bonus token.
- # Perform a forward pass on the expanded batch and filter the
- # response to retain only the original sequences' responses.
- expanded_request, indices_of_seq_with_bonus_tokens =\
- self._expand_execute_model_request(
- execute_model_req, seq_ids_with_bonus_token_in_last_step)
- # Run model sample_len times.
- model_outputs: List[SamplerOutput] = []
- if isinstance(
- self.model_runner, TP1DraftModelRunner
- ) and self.model_runner.supports_gpu_multi_step(expanded_request):
- # Here we run the draft_model_runner with multi-step prepare
- # on the GPU directly
- expanded_request.num_steps = sample_len
- model_outputs = self.execute_model(
- execute_model_req=expanded_request)
- else:
- # Here we run multi-step directly, with every step prepared
- # on the CPU.
- # TODO: Remove this branch once DraftModelRunner supports TP>1
- # and other restrictions that are part of DraftModelRunner's
- # supports_gpu_multi_step(..)
- for _ in range(sample_len):
- model_output: List[SamplerOutput] = super().execute_model(
- execute_model_req=expanded_request)
- assert (len(model_output) == 1
- ), "composing multistep workers not supported"
- model_output = model_output[0]
- self._append_new_tokens(
- model_output, expanded_request.seq_group_metadata_list)
- model_outputs.append(model_output)
- filtered_model_outputs = self._filter_model_output(
- model_outputs, indices_of_seq_with_bonus_tokens)
- return filtered_model_outputs, True
- @staticmethod
- def _expand_execute_model_request(
- execute_model_req: ExecuteModelRequest,
- seq_with_bonus_token_in_last_step: set,
- ) -> Tuple[ExecuteModelRequest, List[int]]:
- """
- Expands the execute model request based on sequences with bonus
- tokens.
- For each sequence with a bonus token, this method creates a new
- sequence without the bonus token and adds it to the execute model
- request. The original sequence groups are also retained. The indices
- of the original sequence groups are returned for further processing.
- Args:
- execute_model_req (ExecuteModelRequest): The original execute
- model request.
- seq_with_bonus_token_in_last_step (set): Set of sequence IDs that
- contain bonus tokens.
- Returns:
- Tuple[ExecuteModelRequest, List[int]]: The updated execute model
- request with expanded sequences and a list of indices corresponding
- to the original sequence groups.
- """
- updated_seq_group_metadata_list: List[SequenceGroupMetadata] = []
- updated_execute_model_req = execute_model_req.clone(
- updated_seq_group_metadata_list)
- indices_of_original_sequence_groups = []
- for seq_group in execute_model_req.seq_group_metadata_list:
- seq_group_has_bonus_tokens = False
- for seq_id, _ in seq_group.seq_data.items():
- # Identify sequences with bonus tokens in the sequence group.
- if seq_id in seq_with_bonus_token_in_last_step:
- seq_group_has_bonus_tokens = True
- break
- if seq_group_has_bonus_tokens:
- #Create new sequences without the last bonus token. These new
- # sequence have the same sequence id as the original sequence.
- # We create a new sequence group and add them there.
- updated_seq_group_without_bonus_token = \
- MultiStepWorker._copy_seq_metadata_excluding_last_token(
- seq_group, seq_with_bonus_token_in_last_step)
- updated_seq_group_metadata_list.append(
- updated_seq_group_without_bonus_token)
- # Add the original sequence group.
- updated_seq_group_metadata_list.append(
- MultiStepWorker._shallow_copy_seq_group_metadata(seq_group))
- # Record the index of the original sequence group.
- indices_of_original_sequence_groups.append(
- len(updated_seq_group_metadata_list) - 1)
- updated_execute_model_req.seq_group_metadata_list =\
- updated_seq_group_metadata_list
- if isinstance(updated_execute_model_req.previous_hidden_states,
- HiddenStates):
- updated_execute_model_req.previous_hidden_states\
- .expand_with_bonus_tokens(seq_with_bonus_token_in_last_step)
- return updated_execute_model_req, indices_of_original_sequence_groups
- @staticmethod
- def _filter_model_output(
- expanded_batch_outputs: List[SamplerOutput],
- output_indices_to_retain: List[int]) -> List[SamplerOutput]:
- """
- Filters the model output to include only the specified sequence
- outputs. This method contracts the expanded batch output from the
- model to retain the outputs of only those sequences indicated by the
- provided indices.
- Args:
- expanded_batch_output (List[SamplerOutput]): The expanded output
- batch from the model.
- output_indices_to_retain (List[int]): Indices of the model outputs
- to retain.
- Returns:
- List[SamplerOutput]: A list containing the filtered model
- outputs for the specified indices.
- """
- return [
- SamplerOutput(
- outputs=[
- expanded_batch_output.outputs[i]
- for i in output_indices_to_retain
- ] if len(expanded_batch_output.outputs) > 0 else [],
- sampled_token_probs=(
- expanded_batch_output.
- sampled_token_probs[output_indices_to_retain]
- if expanded_batch_output.sampled_token_probs is not None
- else None),
- logprobs=(
- expanded_batch_output.logprobs[output_indices_to_retain]
- if expanded_batch_output.logprobs is not None else None),
- sampled_token_ids=(expanded_batch_output.
- sampled_token_ids[output_indices_to_retain]
- if expanded_batch_output.sampled_token_ids
- is not None else None))
- for expanded_batch_output in expanded_batch_outputs
- ]
- def get_spec_proposals(
- self,
- execute_model_req: ExecuteModelRequest,
- seq_ids_with_bonus_token_in_last_step: set,
- ) -> SpeculativeProposals:
- """Produce speculations given an input batch of sequences. The number of
- speculative tokens per sequence is determined by max_proposal_len.
- """
- return self._proposer.get_spec_proposals(
- execute_model_req, seq_ids_with_bonus_token_in_last_step)
- @staticmethod
- def _append_new_tokens(
- model_output: List[SamplerOutput],
- seq_group_metadata_list: List[SequenceGroupMetadata]) -> None:
- """Given model output from a single run, append the tokens to the
- sequences. This is normally done outside of the worker, but it is
- required if the worker is to perform multiple forward passes.
- """
- for seq_group_metadata, sequence_group_outputs in zip(
- seq_group_metadata_list, model_output):
- seq_group_metadata.is_prompt = False
- for seq_output in sequence_group_outputs.samples:
- # NOTE: Beam search is not supported, so we can assume that
- # parent_seq_id == seq_id.
- seq = seq_group_metadata.seq_data[seq_output.parent_seq_id]
- token_id = seq_output.output_token
- token_logprob = seq_output.logprobs[token_id]
- seq.append_token_id(token_id, token_logprob.logprob)
- seq.update_num_computed_tokens(1)
- @staticmethod
- def _shallow_copy_seq_group_metadata(
- seq_group_metadata: SequenceGroupMetadata, ) -> SequenceGroupMetadata:
- """Copy input data structures to remove side-effects when input data
- structures are shared with other modules.
- Helpful when the Aphrodite scheduler runs in the same process as
- the worker. The alternative is deep-copying (or other form of deep
- copy); this has performance downsides.
- """
- # Shallow-copy the SequenceGroupMetadata. This allows us to
- # append tokens and change is_prompt without external side-effects.
- # We must shallow-copy seq_group_metadata as is_prompt could change.
- new_seq_group_metadata = copy.copy(seq_group_metadata)
- # We must shallow-copy seq_data as we will append token ids
- new_seq_data: Dict[int, SequenceData] = {}
- for seq_id, old_seq_data in seq_group_metadata.seq_data.items():
- new_seq_data[seq_id] = copy.copy(old_seq_data)
- new_seq_data[seq_id].output_token_ids =\
- old_seq_data.output_token_ids[:]
- new_seq_group_metadata.seq_data = new_seq_data
- return new_seq_group_metadata
- @staticmethod
- def _copy_seq_metadata_excluding_last_token(
- seq_group_metadata: SequenceGroupMetadata,
- seq_ids_to_copy: Set[int],
- ) -> SequenceGroupMetadata:
- """
- Creates a shallow copy of the given SequenceGroupMetadata, retaining
- only the sequence IDs specified in seq_ids_to_copy. For each of these
- sequence IDs, all output_token_ids except the last one are copied.
- Sequence IDs not in seq_ids_to_copy are excluded from the copy.
-
- Parameters:
- seq_group_metadata (SequenceGroupMetadata): The original sequence
- group metadata.
- seq_ids_to_copy (Set[int]): The set of sequence IDs to include in the
- copy.
-
- Returns:
- SequenceGroupMetadata: A shallow copy of the sequence group metadata
- with the specified modifications.
- """
- # Shallow-copy the SequenceGroupMetadata.
- new_seq_group_metadata = copy.copy(seq_group_metadata)
- # Shallow-copy seq_data and modify the output_token_ids.
- new_seq_data: Dict[int, SequenceData] = {}
- for seq_id, old_seq_data in seq_group_metadata.seq_data.items():
- if (seq_id in seq_ids_to_copy):
- new_seq_data[seq_id] = copy.copy(old_seq_data)
- # Copy all the output token ids except the last.
- # Also reduce num_computed_tokens by 1 since we are not
- # including the last output token.
- # NOTE: num_computed_tokens is not directly used by the
- # speculative decoding workers, as it is only relevant for
- # chunked prefill, which is disabled for speculative decoding.
- # However, to maintain consistency in num_computed_tokens,
- # we update it here.
- new_seq_data[seq_id].output_token_ids =\
- old_seq_data.output_token_ids[:-1]
- new_seq_data[seq_id].update_num_computed_tokens(-1)
- new_seq_group_metadata.seq_data = new_seq_data
- return new_seq_group_metadata
- def _assert_enough_kv_space(
- self, seq_group_metadata_list: List[SequenceGroupMetadata],
- num_steps: int) -> None:
- """Assert there are enough physical blocks per sequence to store the
- current KV plus additional KV from num_steps tokens.
- """
- assert self.model_runner.block_size is not None
- for seq_group_metadata in seq_group_metadata_list:
- # Only one seq_id is guaranteed because there is no beam search.
- seq_id = list(seq_group_metadata.seq_data.keys())[0]
- seq = seq_group_metadata.seq_data[seq_id]
- # After num_steps, the seq len will be the current seq len
- # plus one token per step.
- final_seq_len = seq.get_len() + num_steps
- # We will have final_seq_len - 1 KV because Aphrodite saves KV for
- # a token in the iteration after the token was generated.
- required_num_kv_slots = final_seq_len - 1
- # The allocated number of kv slots is the number of allocated blocks
- # times the number of slots of block.
- number_physical_blocks = len(
- seq_group_metadata.block_tables[seq_id])
- allocated_kv_slots = (number_physical_blocks *
- self.model_runner.block_size)
- if required_num_kv_slots > allocated_kv_slots:
- request_id = seq_group_metadata.request_id
- raise ValueError(
- "The worker attempted to run "
- f"{num_steps} times but found insufficient KV space for "
- f"{request_id=} {seq_id=}. ({allocated_kv_slots=} "
- f"{required_num_kv_slots=}).")
- def _raise_if_unsupported(
- self,
- execute_model_req: ExecuteModelRequest,
- ) -> None:
- """MultiStepWorker does not yet implement support for cache swap
- operations or beam search.
- """
- if any([
- execute_model_req.blocks_to_swap_in,
- execute_model_req.blocks_to_swap_out,
- execute_model_req.blocks_to_copy
- ]):
- raise NotImplementedError(
- "MultiStepWorker does not support cache operations")
- if any(
- len(seq_group_metadata.seq_data.keys()) != 1
- for seq_group_metadata in
- execute_model_req.seq_group_metadata_list):
- raise NotImplementedError(
- "MultiStepWorker does not support beam search.")
|