123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212 |
- import enum
- import os
- import random
- import time
- from collections import deque
- from dataclasses import dataclass, field
- from typing import Deque, Dict, Iterable, List, Optional, Set, Tuple, Union
- from loguru import logger
- from aphrodite.common.config import CacheConfig, LoRAConfig, SchedulerConfig
- from aphrodite.common.sequence import (Sequence, SequenceData, SequenceGroup,
- SequenceGroupMetadata, SequenceStatus)
- from aphrodite.lora.request import LoRARequest
- from aphrodite.processing.interfaces import AllocStatus, BlockSpaceManager
- from aphrodite.prompt_adapter.request import PromptAdapterRequest
- # Test-only. If configured, decode is preempted with
- # ARTIFICIAL_PREEMPTION_PROB% probability.
- ENABLE_ARTIFICIAL_PREEMPT = bool(
- os.getenv("APHRODITE_TEST_ENABLE_ARTIFICIAL_PREEMPT", False)) # noqa
- ARTIFICIAL_PREEMPTION_PROB = 0.5
- ARTIFICIAL_PREEMPTION_MAX_CNT = 500
- class PreemptionMode(enum.Enum):
- """Preemption modes.
- 1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
- and swap them back in when the sequences are resumed.
- 2. Recomputation: Discard the blocks of the preempted sequences and
- recompute them when the sequences are resumed, treating the sequences as
- new prompts.
- """
- SWAP = enum.auto()
- RECOMPUTE = enum.auto()
- @dataclass
- class SchedulingBudget:
- """The available slots for scheduling.
- TODO: Right now, the budget is request_id-aware meaning it can ignore
- budget update from the same request_id. It is because in normal scheduling
- path, we update RUNNING num_seqs ahead of time, meaning it could be
- updated more than once when scheduling RUNNING requests. Since this won't
- happen if we only have chunked prefill scheduling, we can remove this
- feature from the API when chunked prefill is enabled by default.
- """
- token_budget: int
- max_num_seqs: int
- _request_ids_num_batched_tokens: Set[str] = field(default_factory=set)
- _request_ids_num_curr_seqs: Set[str] = field(default_factory=set)
- _num_batched_tokens: int = 0
- _num_curr_seqs: int = 0
- def can_schedule(self, *, num_new_tokens: int, num_new_seqs: int):
- assert num_new_tokens != 0
- assert num_new_seqs != 0
- return (self.num_batched_tokens + num_new_tokens <= self.token_budget
- and self.num_curr_seqs + num_new_seqs <= self.max_num_seqs)
- def remaining_token_budget(self):
- return self.token_budget - self.num_batched_tokens
- def add_num_batched_tokens(self, req_id: str, num_batched_tokens: int):
- if req_id in self._request_ids_num_batched_tokens:
- return
- self._request_ids_num_batched_tokens.add(req_id)
- self._num_batched_tokens += num_batched_tokens
- def subtract_num_batched_tokens(self, req_id: str,
- num_batched_tokens: int):
- if req_id in self._request_ids_num_batched_tokens:
- self._request_ids_num_batched_tokens.remove(req_id)
- self._num_batched_tokens -= num_batched_tokens
- def add_num_seqs(self, req_id: str, num_curr_seqs: int):
- if req_id in self._request_ids_num_curr_seqs:
- return
- self._request_ids_num_curr_seqs.add(req_id)
- self._num_curr_seqs += num_curr_seqs
- def subtract_num_seqs(self, req_id: str, num_curr_seqs: int):
- if req_id in self._request_ids_num_curr_seqs:
- self._request_ids_num_curr_seqs.remove(req_id)
- self._num_curr_seqs -= num_curr_seqs
- @property
- def num_batched_tokens(self):
- return self._num_batched_tokens
- @property
- def num_curr_seqs(self):
- return self._num_curr_seqs
- @dataclass
- class ScheduledSequenceGroup:
- # A sequence group that's scheduled.
- seq_group: SequenceGroup
- # The total chunk size (number of tokens) to process for next iteration.
- # 1 for decoding. Same as prompt tokens for prefill, but if prefill is
- # chunked, it can be smaller than that.
- token_chunk_size: int
- @dataclass
- class SchedulerOutputs:
- """The scheduling decision made from a scheduler."""
- # Scheduled sequence groups.
- scheduled_seq_groups: Iterable[ScheduledSequenceGroup]
- # Number of prefill groups scheduled.
- num_prefill_groups: int
- # Total number of batched tokens.
- num_batched_tokens: int
- # Blocks to swap in. List of CPU -> GPU block number.
- blocks_to_swap_in: List[Tuple[int, int]]
- # Blocks to swap out. List of GPU -> CPU block number.
- blocks_to_swap_out: List[Tuple[int, int]]
- # Blocks to copy. Source to dest block.
- blocks_to_copy: List[Tuple[int, int]]
- # Sequence groups that are going to be ignored.
- ignored_seq_groups: List[SequenceGroup]
- # The number of slots for lookahead decoding.
- num_lookahead_slots: int
- # The number of requests in the running queue
- running_queue_size: int
- preempted: int
- def __post_init__(self):
- # Swap in and swap out should never happen at the same time.
- assert not (self.blocks_to_swap_in and self.blocks_to_swap_out)
- self.num_loras: int = len(self.lora_requests)
- if self.num_loras > 0:
- self._sort_by_lora_ids()
- self.num_prompt_adapters: int = len(self.prompt_adapter_requests)
- def is_empty(self) -> bool:
- # NOTE: We do not consider the ignored sequence groups.
- return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
- and not self.blocks_to_swap_out and not self.blocks_to_copy)
- def _sort_by_lora_ids(self):
- self.scheduled_seq_groups = sorted(
- self.scheduled_seq_groups,
- key=lambda g: (g.seq_group.lora_int_id, g.seq_group.request_id))
- @property
- def lora_requests(self) -> Set[LoRARequest]:
- return {
- g.seq_group.lora_request
- for g in self.scheduled_seq_groups
- if g.seq_group.lora_request is not None
- }
- @property
- def prompt_adapter_requests(self) -> Set[PromptAdapterRequest]:
- return {
- g.seq_group.prompt_adapter_request
- for g in self.scheduled_seq_groups
- if g.seq_group.prompt_adapter_request is not None
- }
- @dataclass
- class SchedulerRunningOutputs:
- """The requests that are scheduled from a running queue.
- Could contain prefill (prefill that's chunked) or decodes. If there's not
- enough memory, it can be preempted (for recompute) or swapped out.
- """
- # Selected sequences that are running and in a decoding phase.
- decode_seq_groups: List[SequenceGroup]
- # Selected sequences that are running and in a prefill phase.
- # I.e., it means the prefill has been chunked.
- prefill_seq_groups: List[SequenceGroup]
- # The preempted sequences.
- preempted: List[SequenceGroup]
- # Sequences that are swapped out.
- swapped_out: List[SequenceGroup]
- # The blocks to swap out.
- blocks_to_swap_out: List[Tuple[int, int]]
- # The blocks to copy.
- blocks_to_copy: List[Tuple[int, int]]
- # The number of slots for lookahead decoding.
- num_lookahead_slots: int
- @classmethod
- def create_empty(cls) -> "SchedulerRunningOutputs":
- return SchedulerRunningOutputs(
- decode_seq_groups=[],
- prefill_seq_groups=[],
- preempted=[],
- swapped_out=[],
- blocks_to_swap_out=[],
- blocks_to_copy=[],
- num_lookahead_slots=0,
- )
- @dataclass
- class SchedulerSwappedInOutputs:
- """The requests that are scheduled from a swap queue.
- Could contain prefill (prefill that's chunked) or decodes.
- """
- # Selected sequences that are going to be swapped in and is in a
- # decoding phase.
- decode_seq_groups: List[SequenceGroup]
- # Selected sequences that are going to be swapped in and in a prefill
- # phase. I.e., it means the prefill has been chunked.
- prefill_seq_groups: List[SequenceGroup]
- # The blocks to swap in.
- blocks_to_swap_in: List[Tuple[int, int]]
- # The blocks to copy.
- blocks_to_copy: List[Tuple[int, int]]
- # The number of slots for lookahead decoding.
- num_lookahead_slots: int
- # Infeasible sequence groups.
- infeasible_seq_groups: List[SequenceGroup]
- @classmethod
- def create_empty(cls) -> "SchedulerSwappedInOutputs":
- return SchedulerSwappedInOutputs(
- decode_seq_groups=[],
- prefill_seq_groups=[],
- blocks_to_swap_in=[],
- blocks_to_copy=[],
- num_lookahead_slots=0,
- infeasible_seq_groups=[],
- )
- @dataclass
- class SchedulerPrefillOutputs:
- """The requests that are scheduled from a waiting queue.
- Could contain a fresh prefill requests or preempted requests that need
- to be recomputed from scratch.
- """
- # Selected sequences for prefill.
- seq_groups: List[SequenceGroup]
- # Ignored sequence groups.
- ignored_seq_groups: List[SequenceGroup]
- num_lookahead_slots: int
- @classmethod
- def create_empty(cls) -> "SchedulerPrefillOutputs":
- return SchedulerPrefillOutputs(
- seq_groups=[],
- ignored_seq_groups=[],
- num_lookahead_slots=0,
- )
- class Scheduler:
- def __init__(
- self,
- scheduler_config: SchedulerConfig,
- cache_config: CacheConfig,
- lora_config: Optional[LoRAConfig],
- pipeline_parallel_size: int = 1,
- ) -> None:
- self.scheduler_config = scheduler_config
- self.cache_config = cache_config
- # Note for LoRA scheduling: the current policy is extremely
- # simple and NOT fair. It can lead to starvation of some
- # LoRAs. This should be improved in the future.
- self.lora_config = lora_config
- version = "v1"
- if self.scheduler_config.use_v2_block_manager:
- version = "v2"
- if self.scheduler_config.embedding_mode:
- version = "embedding"
- BlockSpaceManagerImpl = BlockSpaceManager.get_block_space_manager_class(
- version)
- num_gpu_blocks = cache_config.num_gpu_blocks
- if num_gpu_blocks:
- num_gpu_blocks //= pipeline_parallel_size
- num_cpu_blocks = cache_config.num_cpu_blocks
- if num_cpu_blocks:
- num_cpu_blocks //= pipeline_parallel_size
- # Create the block space manager.
- self.block_manager = BlockSpaceManagerImpl(
- block_size=self.cache_config.block_size,
- num_gpu_blocks=num_gpu_blocks,
- num_cpu_blocks=num_cpu_blocks,
- sliding_window=self.cache_config.sliding_window,
- enable_caching=self.cache_config.enable_prefix_caching)
- # Sequence groups in the WAITING state.
- # Contain new prefill or preempted requests.
- self.waiting: Deque[SequenceGroup] = deque()
- # Sequence groups in the RUNNING state.
- # Contain decode requests.
- self.running: Deque[SequenceGroup] = deque()
- # Sequence groups in the SWAPPED state.
- # Contain decode requests that are swapped out.
- self.swapped: Deque[SequenceGroup] = deque()
- # Sequence groups finished requests ids since last step iteration.
- # It lets the model know that any state associated with these requests
- # can and must be released after the current step.
- # This is used to evict the finished requests from the Mamba cache.
- self._finished_requests_ids: List[str] = list()
- # Time at previous scheduling step
- self.prev_time = 0.0
- # Did we schedule a prompt at previous step?
- self.prev_prompt = False
- # Latency of the last prompt step
- self.last_prompt_latency = 0.0
- # preemption mode, RECOMPUTE or SWAP
- self.user_specified_preemption_mode = scheduler_config.preemption_mode
- # The following field is test-only. It is used to inject artificial
- # preemption.
- self.enable_artificial_preemption = ENABLE_ARTIFICIAL_PREEMPT
- self.artificial_preempt_cnt = (ARTIFICIAL_PREEMPTION_MAX_CNT
- if self.enable_artificial_preemption
- else 0)
- self.num_cumulative_preemption: int = 0
- @property
- def lora_enabled(self) -> bool:
- return bool(self.lora_config)
- @property
- def num_decoding_tokens_per_seq(self) -> int:
- """The number of new tokens."""
- return 1
- def add_seq_group(self, seq_group: SequenceGroup) -> None:
- # Add sequence groups to the waiting queue.
- self.waiting.append(seq_group)
- def _add_seq_group_to_running(self, seq_group: SequenceGroup) -> None:
- # Add sequence groups to the running queue.
- # Only for testing purposes.
- self.running.append(seq_group)
- def _add_seq_group_to_swapped(self, seq_group: SequenceGroup) -> None:
- # Add sequence groups to the swapped queue.
- # Only for testing purposes.
- self.swapped.append(seq_group)
- def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
- """Aborts a sequence group with the given ID.
- Check if the sequence group with the given ID
- is present in any of the state queue.
- If present, remove the sequence group from the state queue.
- Also, if any of the sequences in the sequence group is not finished,
- free the sequence with status `FINISHED_ABORTED`.
- Otherwise, do nothing.
- Args:
- request_id: The ID(s) of the sequence group to abort.
- """
- if isinstance(request_id, str):
- request_id = (request_id, )
- request_ids = set(request_id)
- for state_queue in [self.waiting, self.running, self.swapped]:
- aborted_groups: List[SequenceGroup] = []
- for seq_group in state_queue:
- if not request_ids:
- # Using 'break' here may add two extra iterations,
- # but is acceptable to reduce complexity.
- break
- if seq_group.request_id in request_ids:
- # Appending aborted group into pending list.
- aborted_groups.append(seq_group)
- request_ids.remove(seq_group.request_id)
- for aborted_group in aborted_groups:
- # Remove the sequence group from the state queue.
- state_queue.remove(aborted_group)
- # Remove the aborted request from the Mamba cache.
- self._finished_requests_ids.append(aborted_group.request_id)
- for seq in aborted_group.get_seqs():
- if seq.is_finished():
- continue
- seq.status = SequenceStatus.FINISHED_ABORTED
- self.free_seq(seq)
- def has_unfinished_seqs(self) -> bool:
- return len(self.waiting) != 0 or len(self.running) != 0 or len(
- self.swapped) != 0
- def get_num_unfinished_seq_groups(self) -> int:
- return len(self.waiting) + len(self.running) + len(self.swapped)
- def get_and_reset_finished_requests_ids(self) -> List[str]:
- """Flushes the list of request ids of previously finished seq_groups."""
- finished_requests_ids = self._finished_requests_ids
- self._finished_requests_ids = list()
- return finished_requests_ids
- def _schedule_running(
- self,
- budget: SchedulingBudget,
- curr_loras: Optional[Set[int]],
- enable_chunking: bool = False,
- ) -> SchedulerRunningOutputs:
- """Schedule sequence groups that are running.
- Running queue should include decode and chunked prefill requests.
- Args:
- budget: The scheduling budget. The argument is in-place updated
- when any decodes are preempted.
- curr_loras: Currently batched lora request ids. The argument is
- in-place updated when any decodes are preempted.
- enable_chunking: If True, seq group can be chunked and only a
- chunked number of tokens are scheduled if
- `budget.num_batched_tokens` has not enough capacity to schedule
- all tokens.
-
- Returns:
- SchedulerRunningOutputs.
- """
- # Blocks that need to be swapped or copied before model execution.
- blocks_to_swap_out: List[Tuple[int, int]] = []
- blocks_to_copy: List[Tuple[int, int]] = []
- decode_seq_groups: List[ScheduledSequenceGroup] = []
- prefill_seq_groups: List[ScheduledSequenceGroup] = []
- preempted: List[SequenceGroup] = []
- swapped_out: List[SequenceGroup] = []
- # NOTE: Preemption happens only when there is no available slot
- # to keep all the sequence groups in the RUNNING state.
- running_queue = self.running
- while running_queue:
- seq_group = running_queue[0]
- num_running_tokens = self._get_num_new_tokens(
- seq_group, SequenceStatus.RUNNING, enable_chunking, budget)
- if num_running_tokens == 0:
- break
- running_queue.popleft()
- while not self._can_append_slots(seq_group):
- budget.subtract_num_batched_tokens(seq_group.request_id,
- num_running_tokens)
- num_running_seqs = seq_group.get_max_num_running_seqs()
- budget.subtract_num_seqs(seq_group.request_id,
- num_running_seqs)
- if (curr_loras is not None and seq_group.lora_int_id > 0
- and seq_group.lora_int_id in curr_loras):
- curr_loras.remove(seq_group.lora_int_id)
- if running_queue:
- # Preempt the lowest-priority sequence groups.
- victim_seq_group = running_queue.pop()
- preempted_mode = self._preempt(victim_seq_group,
- blocks_to_swap_out)
- if preempted_mode == PreemptionMode.RECOMPUTE:
- preempted.append(victim_seq_group)
- else:
- swapped_out.append(victim_seq_group)
- else:
- # No other sequence groups can be preempted.
- # Preempt the current sequence group.
- preempted_mode = self._preempt(seq_group,
- blocks_to_swap_out)
- if preempted_mode == PreemptionMode.RECOMPUTE:
- preempted.append(seq_group)
- else:
- swapped_out.append(seq_group)
- break
- else:
- self._append_slots(seq_group, blocks_to_copy)
- is_prefill = seq_group.is_prefill()
- if is_prefill:
- prefill_seq_groups.append(
- ScheduledSequenceGroup(
- seq_group=seq_group,
- token_chunk_size=num_running_tokens))
- else:
- decode_seq_groups.append(
- ScheduledSequenceGroup(seq_group=seq_group,
- token_chunk_size=1))
- budget.add_num_batched_tokens(seq_group.request_id,
- num_running_tokens)
- # OPTIMIZATION: Note that get_max_num_running_seqs is
- # expensive. For the default scheduling chase where
- # enable_chunking is False, num_seqs are updated before running
- # this method, so we don't have to update it again here.
- if enable_chunking:
- num_running_seqs = seq_group.get_max_num_running_seqs()
- budget.add_num_seqs(seq_group.request_id, num_running_seqs)
- if curr_loras is not None and seq_group.lora_int_id > 0:
- curr_loras.add(seq_group.lora_int_id)
- return SchedulerRunningOutputs(
- decode_seq_groups=decode_seq_groups,
- prefill_seq_groups=prefill_seq_groups,
- preempted=preempted,
- swapped_out=swapped_out,
- blocks_to_swap_out=blocks_to_swap_out,
- blocks_to_copy=blocks_to_copy,
- num_lookahead_slots=self._get_num_lookahead_slots(
- is_prefill=False))
- def _schedule_swapped(
- self,
- budget: SchedulingBudget,
- curr_loras: Optional[Set[int]],
- enable_chunking: bool = False,
- ) -> SchedulerSwappedInOutputs:
- """Schedule sequence groups that are swapped out.
- It schedules swapped requests as long as it fits `budget` and
- curr_loras <= max_lora from the scheduling config. The input arguments
- `budget` and `curr_loras` are updated based on scheduled seq_groups.
- Args:
- budget: The scheduling budget. The argument is in-place updated
- when any requests are swapped in.
- curr_loras: Currently batched lora request ids. The argument is
- in-place updated when any requests are swapped in.
- enable_chunking: If True, seq group can be chunked and only a
- chunked number of tokens are scheduled if
- `budget.num_batched_tokens` has not enough capacity to schedule
- all tokens.
- Returns:
- SchedulerSwappedInOutputs.
- """
- # Blocks that need to be swapped or copied before model execution.
- blocks_to_swap_in: List[Tuple[int, int]] = []
- blocks_to_copy: List[Tuple[int, int]] = []
- decode_seq_groups: List[ScheduledSequenceGroup] = []
- prefill_seq_groups: List[ScheduledSequenceGroup] = []
- infeasible_seq_groups: List[SequenceGroup] = []
- swapped_queue = self.swapped
- leftover_swapped: Deque[SequenceGroup] = deque()
- while swapped_queue:
- seq_group = swapped_queue[0]
- # If the sequence group cannot be swapped in, stop.
- is_prefill = seq_group.is_prefill()
- alloc_status = self.block_manager.can_swap_in(
- seq_group, self._get_num_lookahead_slots(is_prefill))
- if alloc_status == AllocStatus.LATER:
- break
- elif alloc_status == AllocStatus.NEVER:
- logger.warning(f"Failing the request {seq_group.request_id} "
- "because there's not enough kv cache blocks to "
- "run the entire sequence.")
- for seq in seq_group.get_seqs():
- seq.status = SequenceStatus.FINISHED_IGNORED
- infeasible_seq_groups.append(seq_group)
- swapped_queue.popleft()
- continue
- lora_int_id = 0
- if self.lora_enabled:
- lora_int_id = seq_group.lora_int_id
- assert curr_loras is not None
- assert self.lora_config is not None
- if (lora_int_id > 0 and (lora_int_id not in curr_loras)
- and len(curr_loras) >= self.lora_config.max_loras):
- # We don't have a space for another LoRA, so
- # we ignore this request for now.
- leftover_swapped.appendleft(seq_group)
- swapped_queue.popleft()
- continue
- # The total number of sequences in the RUNNING state should not
- # exceed the maximum number of sequences.
- num_new_seqs = seq_group.get_max_num_running_seqs()
- num_new_tokens = self._get_num_new_tokens(seq_group,
- SequenceStatus.SWAPPED,
- enable_chunking, budget)
- if (num_new_tokens == 0
- or not budget.can_schedule(num_new_tokens=num_new_tokens,
- num_new_seqs=num_new_seqs)):
- break
- if lora_int_id > 0 and curr_loras is not None:
- curr_loras.add(lora_int_id)
- swapped_queue.popleft()
- self._swap_in(seq_group, blocks_to_swap_in)
- self._append_slots(seq_group, blocks_to_copy)
- is_prefill = seq_group.is_prefill()
- if is_prefill:
- prefill_seq_groups.append(
- ScheduledSequenceGroup(seq_group,
- token_chunk_size=num_new_tokens))
- else:
- decode_seq_groups.append(
- ScheduledSequenceGroup(seq_group, token_chunk_size=1))
- budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)
- budget.add_num_seqs(seq_group.request_id, num_new_seqs)
- swapped_queue.extendleft(leftover_swapped)
- return SchedulerSwappedInOutputs(
- decode_seq_groups=decode_seq_groups,
- prefill_seq_groups=prefill_seq_groups,
- blocks_to_swap_in=blocks_to_swap_in,
- blocks_to_copy=blocks_to_copy,
- num_lookahead_slots=self._get_num_lookahead_slots(
- is_prefill=False),
- infeasible_seq_groups=infeasible_seq_groups,
- )
- def _get_prompt_limit(self, seq_group: SequenceGroup) -> int:
- if self.scheduler_config.chunked_prefill_enabled:
- prompt_limit = self.scheduler_config.max_model_len
- else:
- prompt_limit = min(self.scheduler_config.max_model_len,
- self.scheduler_config.max_num_batched_tokens)
- # Model is fine tuned with long context. Return the fine tuned max_len.
- if (seq_group.lora_request
- and seq_group.lora_request.long_lora_max_len):
- assert prompt_limit <= seq_group.lora_request.long_lora_max_len
- return seq_group.lora_request.long_lora_max_len
- else:
- return prompt_limit
- def _schedule_prefills(
- self,
- budget: SchedulingBudget,
- curr_loras: Optional[Set[int]],
- enable_chunking: bool = False,
- ) -> SchedulerPrefillOutputs:
- """Schedule sequence groups that are in prefill stage.
- Note that the current scheduler treats PREEMPTED_FOR_RECOMPUTE
- as a new prefill (that starts from beginning -> most recently generated
- tokens).
- It schedules waiting requests as long as it fits `budget` and
- curr_loras <= max_lora from the scheduling config. The input arguments
- `budget` and `curr_loras` are updated based on scheduled seq_groups.
- Args:
- budget: The scheduling budget. The argument is in-place updated
- when any requests are scheduled.
- curr_loras: Currently batched lora request ids. The argument is
- in-place updated when any requests are scheduled.
- enable_chunking: If True, seq group can be chunked and only a
- chunked number of tokens are scheduled if
- `budget.num_batched_tokens` has not enough capacity to schedule
- all tokens.
- Returns:
- SchedulerSwappedInOutputs.
- """
- ignored_seq_groups: List[SequenceGroup] = []
- seq_groups: List[SequenceGroup] = []
- waiting_queue = self.waiting
- leftover_waiting_sequences: Deque[SequenceGroup] = deque()
- while self._passed_delay(time.time()) and waiting_queue:
- seq_group = waiting_queue[0]
- waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
- assert len(waiting_seqs) == 1, (
- "Waiting sequence group should have only one prompt "
- "sequence.")
- num_new_tokens = self._get_num_new_tokens(seq_group,
- SequenceStatus.WAITING,
- enable_chunking, budget)
- if not enable_chunking:
- num_prompt_tokens = waiting_seqs[0].get_len()
- assert num_new_tokens == num_prompt_tokens
- prompt_limit = self._get_prompt_limit(seq_group)
- if num_new_tokens > prompt_limit:
- logger.warning(
- "Input prompt (%d tokens) is too long"
- " and exceeds limit of %d", num_new_tokens, prompt_limit)
- for seq in waiting_seqs:
- seq.status = SequenceStatus.FINISHED_IGNORED
- ignored_seq_groups.append(seq_group)
- waiting_queue.popleft()
- continue
- # If the sequence group cannot be allocated, stop.
- can_allocate = self.block_manager.can_allocate(seq_group)
- if can_allocate == AllocStatus.LATER:
- break
- elif can_allocate == AllocStatus.NEVER:
- logger.warning(
- "Input prompt (%d tokens) is too long"
- " and exceeds the capacity of block_manager",
- num_new_tokens)
- for seq in waiting_seqs:
- seq.status = SequenceStatus.FINISHED_IGNORED
- ignored_seq_groups.append(seq_group)
- waiting_queue.popleft()
- continue
- lora_int_id = 0
- if self.lora_enabled:
- lora_int_id = seq_group.lora_int_id
- assert curr_loras is not None
- assert self.lora_config is not None
- if (self.lora_enabled and lora_int_id > 0
- and lora_int_id not in curr_loras
- and len(curr_loras) >= self.lora_config.max_loras):
- # We don't have a space for another LoRA, so
- # we ignore this request for now.
- leftover_waiting_sequences.appendleft(seq_group)
- waiting_queue.popleft()
- continue
- num_new_seqs = seq_group.get_max_num_running_seqs()
- if (num_new_tokens == 0
- or not budget.can_schedule(num_new_tokens=num_new_tokens,
- num_new_seqs=num_new_seqs)):
- break
- # Can schedule this request.
- if curr_loras is not None and lora_int_id > 0:
- curr_loras.add(lora_int_id)
- waiting_queue.popleft()
- self._allocate_and_set_running(seq_group)
- seq_groups.append(
- ScheduledSequenceGroup(seq_group=seq_group,
- token_chunk_size=num_new_tokens))
- budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)
- budget.add_num_seqs(seq_group.request_id, num_new_seqs)
- # Queue requests that couldn't be scheduled.
- waiting_queue.extendleft(leftover_waiting_sequences)
- if len(seq_groups) > 0:
- self.prev_prompt = True
- return SchedulerPrefillOutputs(
- seq_groups=seq_groups,
- ignored_seq_groups=ignored_seq_groups,
- num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=True))
- def _schedule_default(self) -> SchedulerOutputs:
- """Schedule queued requests.
-
- The current policy is designed to optimize the throughput. First,
- it batches as many prefill requests as possible. And it schedules
- decodes. If there's a pressure on GPU memory, decode requests can
- be swapped or preempted.
- """
- # Include running requests to the budget.
- budget = SchedulingBudget(
- token_budget=self.scheduler_config.max_num_batched_tokens,
- max_num_seqs=self.scheduler_config.max_num_seqs,
- )
- # Make sure we include num running seqs before scheduling prefill,
- # so that we don't schedule beyond max_num_seqs for prefill.
- for seq_group in self.running:
- budget.add_num_seqs(seq_group.request_id,
- seq_group.get_max_num_running_seqs())
- curr_loras = set(
- seq_group.lora_int_id for seq_group in self.running
- if seq_group.lora_int_id > 0) if self.lora_enabled else None
- prefills = SchedulerPrefillOutputs.create_empty()
- running_scheduled = SchedulerRunningOutputs.create_empty()
- swapped_in = SchedulerSwappedInOutputs.create_empty()
- # If any requests are swapped, prioritized swapped requests.
- if not self.swapped:
- prefills = self._schedule_prefills(budget,
- curr_loras,
- enable_chunking=False)
- # Don't schedule decodes if prefills are scheduled.
- # NOTE: If `_schedule_prefills` doesn't enable chunking, self.running
- # only contains decode requests, not chunked prefills.
- if len(prefills.seq_groups) == 0:
- running_scheduled = self._schedule_running(budget,
- curr_loras,
- enable_chunking=False)
- # If any sequence group is preempted, do not swap in any sequence
- # group. because it means there's no slot for new running requests.
- if len(running_scheduled.preempted) + len(
- running_scheduled.swapped_out) == 0:
- swapped_in = self._schedule_swapped(budget, curr_loras)
- assert (budget.num_batched_tokens <=
- self.scheduler_config.max_num_batched_tokens)
- assert budget.num_curr_seqs <= self.scheduler_config.max_num_seqs
- # Update waiting requests.
- self.waiting.extendleft(running_scheduled.preempted)
- # Update new running requests.
- self.running.extend([s.seq_group for s in prefills.seq_groups])
- self.running.extend(
- [s.seq_group for s in running_scheduled.decode_seq_groups])
- self.running.extend(
- [s.seq_group for s in swapped_in.decode_seq_groups])
- # Update swapped requests.
- self.swapped.extend(running_scheduled.swapped_out)
- preempted = (len(running_scheduled.preempted) +
- len(running_scheduled.swapped_out))
- # There should be no prefill from running queue because this policy
- # doesn't allow chunked prefills.
- assert len(running_scheduled.prefill_seq_groups) == 0
- assert len(swapped_in.prefill_seq_groups) == 0
- return SchedulerOutputs(
- scheduled_seq_groups=(prefills.seq_groups +
- running_scheduled.decode_seq_groups +
- swapped_in.decode_seq_groups),
- num_prefill_groups=len(prefills.seq_groups),
- num_batched_tokens=budget.num_batched_tokens,
- blocks_to_swap_in=swapped_in.blocks_to_swap_in,
- blocks_to_swap_out=running_scheduled.blocks_to_swap_out,
- blocks_to_copy=running_scheduled.blocks_to_copy +
- swapped_in.blocks_to_copy,
- ignored_seq_groups=prefills.ignored_seq_groups +
- swapped_in.infeasible_seq_groups,
- num_lookahead_slots=running_scheduled.num_lookahead_slots,
- running_queue_size=len(self.running),
- preempted=preempted,
- )
- def _schedule_chunked_prefill(self):
- """Schedule queued requests.
-
- Chunked prefill allows to chunk prefill requests, batch them together
- with decode requests. This policy 1. schedule as many decoding requests
- as possible. 2. schedule chunked prefill requests that are not
- finished. 3. schedule swapped request. 4. schedule new prefill
- requests.
- The policy can sustain the high GPU utilization because it can put
- prefill and decodes requests to the same batch, while it improves
- inter token latency because decodes requests don't need to blocked
- by prefill requests.
- """
- budget = SchedulingBudget(
- token_budget=self.scheduler_config.max_num_batched_tokens,
- max_num_seqs=self.scheduler_config.max_num_seqs,
- )
- curr_loras: Set[int] = set()
- prefills = SchedulerPrefillOutputs.create_empty()
- swapped_in = SchedulerSwappedInOutputs.create_empty()
- # Decoding should be always scheduled first by fcfs.
- running_scheduled = self._schedule_running(budget,
- curr_loras,
- enable_chunking=True)
- # Schedule swapped out requests.
- # If preemption happens, it means we don't have space for swap-in.
- if len(running_scheduled.preempted) + len(
- running_scheduled.swapped_out) == 0:
- swapped_in = self._schedule_swapped(budget, curr_loras)
- # Schedule new prefills.
- prefills = self._schedule_prefills(budget,
- curr_loras,
- enable_chunking=True)
- assert (budget.num_batched_tokens <=
- self.scheduler_config.max_num_batched_tokens)
- assert budget.num_curr_seqs <= self.scheduler_config.max_num_seqs
- # Update waiting requests.
- self.waiting.extendleft(running_scheduled.preempted)
- # Update new running requests.
- self.running.extend([s.seq_group for s in prefills.seq_groups])
- self.running.extend(
- [s.seq_group for s in running_scheduled.decode_seq_groups])
- self.running.extend(
- [s.seq_group for s in running_scheduled.prefill_seq_groups])
- self.running.extend(
- [s.seq_group for s in swapped_in.decode_seq_groups])
- self.running.extend(
- [s.seq_group for s in swapped_in.prefill_seq_groups])
- # Update swapped requests.
- self.swapped.extend(running_scheduled.swapped_out)
- return SchedulerOutputs(
- scheduled_seq_groups=(prefills.seq_groups +
- running_scheduled.prefill_seq_groups +
- swapped_in.prefill_seq_groups +
- running_scheduled.decode_seq_groups +
- swapped_in.decode_seq_groups),
- num_prefill_groups=(len(prefills.seq_groups) +
- len(swapped_in.prefill_seq_groups) +
- len(running_scheduled.prefill_seq_groups)),
- num_batched_tokens=budget.num_batched_tokens,
- blocks_to_swap_in=swapped_in.blocks_to_swap_in,
- blocks_to_swap_out=running_scheduled.blocks_to_swap_out,
- blocks_to_copy=running_scheduled.blocks_to_copy +
- swapped_in.blocks_to_copy,
- ignored_seq_groups=prefills.ignored_seq_groups +
- swapped_in.infeasible_seq_groups,
- num_lookahead_slots=running_scheduled.num_lookahead_slots,
- running_queue_size=len(self.running),
- preempted=(len(running_scheduled.preempted) +
- len(running_scheduled.swapped_out)),
- )
- def _schedule(self) -> SchedulerOutputs:
- """Schedule queued requests."""
- if self.scheduler_config.chunked_prefill_enabled:
- return self._schedule_chunked_prefill()
- else:
- return self._schedule_default()
- def _can_append_slots(self, seq_group: SequenceGroup) -> bool:
- """Determine whether or not we have enough space in the KV cache to
- continue generation of the sequence group.
- """
- # It is True only for testing case to trigger artificial preemption.
- if (self.enable_artificial_preemption
- and random.uniform(0, 1) < ARTIFICIAL_PREEMPTION_PROB
- and self.artificial_preempt_cnt > 0):
- self.artificial_preempt_cnt -= 1
- return False
- # Appending slots only occurs in decoding.
- is_prefill = False
- return self.block_manager.can_append_slots(
- seq_group=seq_group,
- num_lookahead_slots=self._get_num_lookahead_slots(is_prefill),
- )
- def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
- # Schedule sequence groups.
- # This function call changes the internal states of the scheduler
- # such as self.running, self.swapped, and self.waiting.
- scheduler_outputs = self._schedule()
- now = time.time()
- # Create input data structures.
- seq_group_metadata_list: List[SequenceGroupMetadata] = []
- for i, scheduled_seq_group in enumerate(
- scheduler_outputs.scheduled_seq_groups):
- seq_group = scheduled_seq_group.seq_group
- token_chunk_size = scheduled_seq_group.token_chunk_size
- seq_group.maybe_set_first_scheduled_time(now)
- # seq_id -> SequenceData
- seq_data: Dict[int, SequenceData] = {}
- # seq_id -> physical block numbers
- block_tables: Dict[int, List[int]] = {}
- for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
- seq_id = seq.seq_id
- seq_data[seq_id] = seq.data
- block_tables[seq_id] = self.block_manager.get_block_table(seq)
- self.block_manager.access_all_blocks_in_seq(seq, now)
- common_computed_block_nums = (
- self.block_manager.get_common_computed_block_ids(
- seq_group.get_seqs(status=SequenceStatus.RUNNING)))
- do_sample = True
- if seq_group.is_prefill():
- seqs = seq_group.get_seqs()
- # Prefill has only 1 sequence.
- assert len(seqs) == 1
- # In the next iteration, all prompt tokens are not computed.
- # It means the prefill is chunked, and we don't need sampling.
- # NOTE: We use get_len instead of get_prompt_len because when
- # a sequence is preempted, prefill includes previous generated
- # output tokens.
- if (token_chunk_size + seqs[0].data.get_num_computed_tokens() <
- seqs[0].data.get_len()):
- do_sample = False
- # It assumes the scheduled_seq_groups is ordered by
- # prefill < decoding.
- is_prompt = seq_group.is_prefill()
- seq_group_metadata = SequenceGroupMetadata(
- request_id=seq_group.request_id,
- is_prompt=is_prompt,
- seq_data=seq_data,
- sampling_params=seq_group.sampling_params,
- block_tables=block_tables,
- do_sample=do_sample,
- pooling_params=seq_group.pooling_params,
- token_chunk_size=token_chunk_size,
- lora_request=seq_group.lora_request,
- computed_block_nums=common_computed_block_nums,
- # `multi_modal_data` will only be present for the 1st comm
- # between engine and worker.
- # the subsequent comms can still use delta, but
- # `multi_modal_data` will be None.
- multi_modal_data=seq_group.multi_modal_data
- if scheduler_outputs.num_prefill_groups > 0 else None,
- prompt_adapter_request=seq_group.prompt_adapter_request,
- )
- seq_group_metadata_list.append(seq_group_metadata)
- # Now that the batch has been created, we can assume all blocks in the
- # batch will have been computed before the next scheduling invocation.
- # This is because the engine assumes that a failure in model execution
- # will crash the Aphrodite instance / will not retry.
- for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups:
- self.block_manager.mark_blocks_as_computed(
- scheduled_seq_group.seq_group)
- return seq_group_metadata_list, scheduler_outputs
- def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
- self.block_manager.fork(parent_seq, child_seq)
- def free_seq(self, seq: Sequence) -> None:
- """Free a sequence from a block table."""
- self.block_manager.free(seq)
- def free_finished_seq_groups(self) -> None:
- remaining: Deque[SequenceGroup] = deque()
- for seq_group in self.running:
- if seq_group.is_finished():
- # Add the finished requests to the finished requests list.
- # This list will be used to update the Mamba cache in the
- # next step.
- self._finished_requests_ids.append(seq_group.request_id)
- else:
- remaining.append(seq_group)
- self.running = remaining
- def _allocate_and_set_running(self, seq_group: SequenceGroup) -> None:
- self.block_manager.allocate(seq_group)
- for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
- seq.status = SequenceStatus.RUNNING
- def _append_slots(
- self,
- seq_group: SequenceGroup,
- blocks_to_copy: List[Tuple[int, int]],
- ) -> None:
- """Appends new slots to the sequences in the given sequence group.
- Args:
- seq_group (SequenceGroup): The sequence group containing the
- sequences to append slots to.
- blocks_to_copy (List[Tuple[int, int]]): A list of tuple of two
- ints, the first int is the source block index, and the second
- int is the destination block index. This list is updated with
- the new source and destination block indices for the appended
- slots.
- """
- num_lookahead_slots = self._get_num_lookahead_slots(is_prefill=False)
- for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
- cows = self.block_manager.append_slots(seq, num_lookahead_slots)
- blocks_to_copy.extend(cows)
- def _preempt(
- self,
- seq_group: SequenceGroup,
- blocks_to_swap_out: List[Tuple[int, int]],
- preemption_mode: Optional[PreemptionMode] = None,
- ) -> PreemptionMode:
- # If preemption mode is not specified, we determine the mode as follows:
- # We use recomputation by default since it incurs lower overhead than
- # swapping. However, when the sequence group has multiple sequences
- # (e.g., beam search), recomputation is not currently supported. In
- # such a case, we use swapping instead.
- # FIXME: This makes our scheduling policy a bit bizarre.
- # As swapped sequences are prioritized over waiting sequences,
- # sequence groups with multiple sequences are implicitly prioritized
- # over sequence groups with a single sequence.
- # TODO: Support recomputation for sequence groups with multiple
- # sequences. This may require a more sophisticated CUDA kernel.
- if self.user_specified_preemption_mode is None:
- if seq_group.get_max_num_running_seqs() == 1:
- preemption_mode = PreemptionMode.RECOMPUTE
- else:
- preemption_mode = PreemptionMode.SWAP
- elif self.user_specified_preemption_mode == "swap":
- preemption_mode = PreemptionMode.SWAP
- else:
- preemption_mode = PreemptionMode.RECOMPUTE
- if self.num_cumulative_preemption % 50 == 0:
- logger.warning(
- f"Sequence group {seq_group.request_id} is preempted by "
- f"{preemption_mode} mode because there is "
- "not enough KV cache space. This can affect the end-to-end "
- "performance. Increase gpu_memory_utilization or "
- "tensor_parallel_size to provide more KV cache memory. "
- "total_num_cumulative_preemption="
- f"{self.num_cumulative_preemption + 1}")
- self.num_cumulative_preemption += 1
- if preemption_mode == PreemptionMode.RECOMPUTE:
- self._preempt_by_recompute(seq_group)
- elif preemption_mode == PreemptionMode.SWAP:
- self._preempt_by_swap(seq_group, blocks_to_swap_out)
- else:
- raise AssertionError("Invalid preemption mode.")
- return preemption_mode
- def _preempt_by_recompute(
- self,
- seq_group: SequenceGroup,
- ) -> None:
- seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
- assert len(seqs) == 1
- for seq in seqs:
- seq.status = SequenceStatus.WAITING
- self.free_seq(seq)
- seq.reset_state_for_recompute()
- def _preempt_by_swap(
- self,
- seq_group: SequenceGroup,
- blocks_to_swap_out: List[Tuple[int, int]],
- ) -> None:
- self._swap_out(seq_group, blocks_to_swap_out)
- def _swap_in(
- self,
- seq_group: SequenceGroup,
- blocks_to_swap_in: List[Tuple[int, int]],
- ) -> None:
- mapping = self.block_manager.swap_in(seq_group)
- blocks_to_swap_in.extend(mapping)
- for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
- seq.status = SequenceStatus.RUNNING
- def _swap_out(
- self,
- seq_group: SequenceGroup,
- blocks_to_swap_out: List[Tuple[int, int]],
- ) -> None:
- if not self.block_manager.can_swap_out(seq_group):
- # FIXME: Abort the sequence group instead of aborting the
- # entire engine.
- raise RuntimeError(
- "Aborted due to the lack of CPU swap space. Please increase "
- "the swap space to avoid this error.")
- mapping = self.block_manager.swap_out(seq_group)
- blocks_to_swap_out.extend(mapping)
- for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
- seq.status = SequenceStatus.SWAPPED
- def _passed_delay(self, now: float) -> bool:
- if self.prev_prompt:
- self.last_prompt_latency = now - self.prev_time
- self.prev_time, self.prev_prompt = now, False
- # Delay scheduling prompts to let waiting queue fill up
- if self.scheduler_config.delay_factor > 0 and self.waiting:
- earliest_arrival_time = min(
- [e.metrics.arrival_time for e in self.waiting])
- passed_delay = (
- (now - earliest_arrival_time) >
- (self.scheduler_config.delay_factor * self.last_prompt_latency)
- or not self.running)
- else:
- passed_delay = True
- return passed_delay
- def _get_num_lookahead_slots(self, is_prefill: bool) -> int:
- """The number of slots to allocate per sequence per step, beyond known
- token ids. Speculative decoding uses these slots to store KV activations
- of tokens which may or may not be accepted.
- Speculative decoding does not yet support prefill, so we do not perform
- lookahead allocation for prefill.
- """
- if is_prefill:
- return 0
- return self.scheduler_config.num_lookahead_slots
- def _get_num_new_tokens(self, seq_group: SequenceGroup,
- status: SequenceStatus, enable_chunking: bool,
- budget: SchedulingBudget) -> int:
- """Get the next new tokens to compute for a given sequence group
- that's in a given `status`.
- The API could chunk the number of tokens to compute based on `budget`
- if `enable_chunking` is True. If a sequence group has multiple
- sequences (e.g., running beam search), it means it is in decoding
- phase, so chunking doesn't happen.
- Returns 0 if the new token cannot be computed due to token budget.
- """
- num_new_tokens = 0
- seqs = seq_group.get_seqs(status=status)
- for seq in seqs:
- num_new_tokens += seq.get_num_new_tokens()
- assert num_new_tokens > 0
- # Chunk if a running request cannot fit in.
- # If number of seq > 1, it means it is doing beam search in a
- # decode phase. Do not chunk in that case.
- if enable_chunking and len(seqs) == 1:
- num_new_tokens = min(num_new_tokens,
- budget.remaining_token_budget())
- return num_new_tokens
|