123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852 |
- import time
- from collections import deque
- from typing import List, Set, Tuple
- from unittest.mock import MagicMock
- import pytest # noqa
- from aphrodite.common.config import CacheConfig, LoRAConfig, SchedulerConfig
- from aphrodite.common.sequence import SequenceGroup, SequenceStatus
- from aphrodite.lora.request import LoRARequest
- from aphrodite.processing.interfaces import AllocStatus
- from aphrodite.processing.scheduler import Scheduler, SchedulingBudget
- from .utils import (append_new_token, append_new_token_seq_group,
- create_dummy_prompt, get_sequence_groups,
- schedule_and_update_computed_tokens)
- def test_scheduler_add_seq_group():
- block_size = 4
- scheduler_config = SchedulerConfig(100, 64, 1)
- cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto",)
- cache_config.num_cpu_blocks = 4
- cache_config.num_gpu_blocks = 4
- scheduler = Scheduler(scheduler_config, cache_config, None)
- # Add seq group to scheduler.
- num_seq_group = 4
- for i in range(num_seq_group):
- _, seq_group = create_dummy_prompt(str(i), block_size)
- scheduler.add_seq_group(seq_group)
- assert scheduler.get_num_unfinished_seq_groups() == i + 1
- def test_scheduler_abort_seq_group():
- block_size = 4
- scheduler_config = SchedulerConfig(100, 64, 1)
- cache_config = CacheConfig(block_size, 1.0, 1, "auto")
- cache_config.num_cpu_blocks = 4
- cache_config.num_gpu_blocks = 4
- scheduler = Scheduler(scheduler_config, cache_config, None)
- # Add multiple seq groups to scheduler.
- num_seq_group = 4
- request_ids: Set[str] = set()
- for i in range(num_seq_group):
- _, seq_group = create_dummy_prompt(str(i), block_size)
- scheduler.add_seq_group(seq_group)
- request_ids.add(str(i))
- # Abort all added seq groups.
- assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
- scheduler.abort_seq_group(request_ids)
- assert scheduler.get_num_unfinished_seq_groups() == 0
- def test_scheduler_schedule_simple():
- block_size = 4
- num_seq_group = 4
- max_model_len = 16
- scheduler_config = SchedulerConfig(64, num_seq_group, max_model_len)
- cache_config = CacheConfig(block_size, 1.0, 1, "auto")
- cache_config.num_cpu_blocks = 8
- cache_config.num_gpu_blocks = 8
- scheduler = Scheduler(scheduler_config, cache_config, None)
- running: List[SequenceGroup] = []
- # Add seq groups to scheduler.
- for i in range(num_seq_group):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=block_size)
- scheduler.add_seq_group(seq_group)
- running.append(seq_group)
- # Schedule seq groups prompts.
- num_tokens = block_size * num_seq_group
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert set(get_sequence_groups(out)) == set(running)
- assert out.num_batched_tokens == num_tokens
- assert (not out.blocks_to_copy and not out.blocks_to_swap_in
- and not out.blocks_to_swap_out)
- assert len(seq_group_meta) == num_seq_group
- append_new_token(out, 1)
- # Schedule seq groups generation.
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert set(get_sequence_groups(out)) == set(running)
- assert out.num_batched_tokens == num_seq_group
- assert (not out.blocks_to_copy and not out.blocks_to_swap_in
- and not out.blocks_to_swap_out)
- assert len(seq_group_meta) == num_seq_group
- append_new_token(out, 1)
- def test_scheduler_prefill_prioritized():
- """Verify running batched tokens are not applied to prefill requests."""
- block_size = 4
- max_model_len = 30
- max_batched_num_tokens = 30
- scheduler_config = SchedulerConfig(max_batched_num_tokens, 2,
- max_model_len)
- cache_config = CacheConfig(block_size, 1.0, 1, "auto")
- cache_config.num_cpu_blocks = 2
- cache_config.num_gpu_blocks = 2
- scheduler = Scheduler(scheduler_config, cache_config, None)
- # Add seq groups to scheduler.
- _, seq_group_a = create_dummy_prompt("1", 1)
- scheduler.add_seq_group(seq_group_a)
- # Schedule seq groups prompts.
- _, out = schedule_and_update_computed_tokens(scheduler)
- assert get_sequence_groups(out) == [seq_group_a]
- # Add a new prefill request B.
- _, seq_group_b = create_dummy_prompt("2", 30)
- scheduler.add_seq_group(seq_group_b)
- # Verify prefill requests are prioritized. Since max_batched_num_tokens
- # is 1, new prefill request has to be scheduled first.
- _, out = schedule_and_update_computed_tokens(scheduler)
- assert get_sequence_groups(out) == [seq_group_b]
- def test_scheduler_schedule_preempt_abort():
- block_size = 4
- max_model_len = 16
- scheduler_config = SchedulerConfig(64, 2, max_model_len)
- cache_config = CacheConfig(block_size, 1.0, 1, "auto")
- cache_config.num_cpu_blocks = 2
- cache_config.num_gpu_blocks = 2
- scheduler = Scheduler(scheduler_config, cache_config, None)
- # Add seq groups to scheduler.
- seq_a, seq_group_a = create_dummy_prompt("1", block_size)
- seq_b, seq_group_b = create_dummy_prompt("2", block_size)
- scheduler.add_seq_group(seq_group_a)
- scheduler.add_seq_group(seq_group_b)
- # Schedule seq groups prompts.
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
- assert out.num_batched_tokens == block_size * 2 # seq_a and seq_b
- assert (not out.blocks_to_copy and not out.blocks_to_swap_in
- and not out.blocks_to_swap_out)
- assert len(seq_group_meta) == 2
- assert scheduler.get_num_unfinished_seq_groups() == 2
- # Append "generated" tokens, allowing the sequence to mark prompt tokens as
- # processed.
- append_new_token(out, 1)
- # Schedule seq groups generation and preempt seq group b.
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert get_sequence_groups(out) == [seq_group_a]
- assert out.num_batched_tokens == 1
- assert (not out.blocks_to_copy and not out.blocks_to_swap_in
- and not out.blocks_to_swap_out)
- assert len(seq_group_meta) == 1
- assert scheduler.get_num_unfinished_seq_groups() == 2
- assert out.preempted == 1
- # Abort seq group a. Re-schedule seq group b prompt with recomputation.
- scheduler.abort_seq_group("1")
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert get_sequence_groups(out) == [seq_group_b]
- assert out.num_batched_tokens == 5 # 4 prompt + 1 generation.
- assert (not out.blocks_to_copy and not out.blocks_to_swap_in
- and not out.blocks_to_swap_out)
- assert len(seq_group_meta) == 1
- assert scheduler.get_num_unfinished_seq_groups() == 1
- def test_scheduler_max_seqs():
- block_size = 4
- num_seq_group = 4
- max_seq_group = 2
- max_model_len = 16
- scheduler_config = SchedulerConfig(64, max_seq_group, max_model_len)
- cache_config = CacheConfig(block_size, 1.0, 1, "auto")
- cache_config.num_cpu_blocks = 8
- cache_config.num_gpu_blocks = 8
- scheduler = Scheduler(scheduler_config, cache_config, None)
- all_seq_groups: List[SequenceGroup] = []
- # Add seq groups to scheduler.
- for i in range(num_seq_group):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=block_size)
- all_seq_groups.append(seq_group)
- # Append 1 seq group
- scheduler.add_seq_group(all_seq_groups[0])
- # Schedule seq groups prompts.
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
- append_new_token(out, 1)
- # Schedule seq groups generation.
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
- append_new_token(out, 1)
- # Append 2 more seq group
- scheduler.add_seq_group(all_seq_groups[1])
- scheduler.add_seq_group(all_seq_groups[2])
- # Schedule seq groups prompts.
- # Only 1 seq group should be scheduled since max_seq_group is 2
- # and one is prompting.
- _, out = schedule_and_update_computed_tokens(scheduler)
- assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
- def test_scheduler_delay_factor():
- block_size = 4
- scheduler_config = SchedulerConfig(100, 64, 16, delay_factor=0.5)
- cache_config = CacheConfig(block_size, 1.0, 1, "auto")
- cache_config.num_cpu_blocks = 8
- cache_config.num_gpu_blocks = 8
- scheduler = Scheduler(scheduler_config, cache_config, None)
- # schedule first prompt
- seq_group_meta, seq_group = create_dummy_prompt("0",
- prompt_length=block_size)
- scheduler.add_seq_group(seq_group)
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert out.num_prefill_groups > 0
- assert seq_group_meta[0].request_id == '0'
- append_new_token(out, 1)
- # wait for a second before scheduling next prompt
- time.sleep(1)
- seq_group_meta, seq_group = create_dummy_prompt("1",
- prompt_length=block_size)
- scheduler.add_seq_group(seq_group)
- # second prompt should *not* be scheduled
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert out.num_prefill_groups == 0
- assert seq_group_meta[0].request_id == '0'
- append_new_token(out, 1)
- # wait for more than 0.5 second and try again
- time.sleep(0.6)
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert out.num_prefill_groups > 0
- assert seq_group_meta[0].request_id == '1'
- append_new_token(out, 1)
- def test_swapped_out_prioritized():
- scheduler = initialize_scheduler(max_num_seqs=6)
- # best_of=2 * 3 == 6 sequences.
- for i in range(3):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
- scheduler.add_seq_group(seq_group)
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- # prefill scheduled now.
- assert len(out.scheduled_seq_groups) == 3
- append_new_token(out, 1)
- # The last request should be swapped out.
- scheduler.block_manager.can_append_slots = MagicMock()
- def cannot_append_second_group(seq_group, num_lookahead_slots):
- return seq_group.request_id != "2"
- scheduler.block_manager.can_append_slots.side_effect = (
- cannot_append_second_group)
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- assert len(out.scheduled_seq_groups) == 2
- assert out.num_batched_tokens == 2
- assert out.blocks_to_swap_out != []
- assert out.blocks_to_swap_in == []
- append_new_token(out, 1)
- # Add 1 more task. Swap should be prioritized over prefill.
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
- scheduler.add_seq_group(seq_group)
- seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
- append_new_token(out, 1)
- assert len(out.scheduled_seq_groups) == 3
- # 3 decodes. It is swapped in.
- assert out.num_batched_tokens == 3
- assert out.blocks_to_swap_in != []
- assert out.blocks_to_swap_out == []
- def initialize_scheduler(*,
- max_num_seqs=1000,
- max_token_budget=1000,
- max_model_len=1000,
- lora_config=None):
- block_size = 4
- scheduler_config = SchedulerConfig(max_token_budget, max_num_seqs,
- max_model_len)
- cache_config = CacheConfig(block_size, 1.0, 1, "auto")
- cache_config.num_cpu_blocks = 8
- cache_config.num_gpu_blocks = 8
- scheduler = Scheduler(scheduler_config, cache_config, lora_config)
- return scheduler
- def create_token_budget(token_budget: int = 10000,
- max_num_seqs: int = 10000) -> SchedulingBudget:
- return SchedulingBudget(
- token_budget=token_budget,
- max_num_seqs=max_num_seqs,
- )
- def add_token_budget(budget: SchedulingBudget,
- num_batched_tokens: int = 0,
- num_curr_seqs: int = 0):
- mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
- budget.add_num_batched_tokens(mock_seq_group.request_id,
- num_batched_tokens)
- budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)
- def test_prefill_schedule_max_prompt_len():
- """
- Test prompt longer than max_prompt_len is aborted.
- """
- scheduler = initialize_scheduler(max_model_len=30)
- _, seq_group = create_dummy_prompt("0", prompt_length=60)
- scheduler.add_seq_group(seq_group)
- budget = create_token_budget()
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 1
- assert len(output.seq_groups) == 0
- assert budget.num_batched_tokens == 0
- assert budget.num_curr_seqs == 0
- assert len(remaining_waiting) == 0
- def test_prefill_schedule_token_budget():
- """
- Test token budget respected.
- """
- scheduler = initialize_scheduler()
- budget = create_token_budget(token_budget=0)
- for i in range(2):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler.add_seq_group(seq_group)
- # 0 token budget == nothing is scheduled.
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 0
- assert len(output.seq_groups) == 0
- assert budget.num_batched_tokens == 0
- assert budget.num_curr_seqs == 0
- assert len(remaining_waiting) == 2
- # 60 token budget == 1 request scheduled.
- budget = create_token_budget(token_budget=60)
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 0
- assert len(output.seq_groups) == 1
- assert budget.num_batched_tokens == 60
- assert budget.num_curr_seqs == 1
- assert len(remaining_waiting) == 1
- # Test when current_batched_tokens respected.
- scheduler = initialize_scheduler()
- budget = create_token_budget(token_budget=60)
- add_token_budget(budget, 30, 0)
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- # Cannot schedule a prompt that doesn't fit the budget.
- scheduler.add_seq_group(seq_group)
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 0
- assert len(output.seq_groups) == 0
- assert budget.num_batched_tokens == 30
- assert budget.num_curr_seqs == 0
- assert len(remaining_waiting) == 1
- budget = create_token_budget(token_budget=90)
- add_token_budget(budget, 30, 0)
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.seq_groups) == 1
- assert budget.num_batched_tokens == 90
- assert budget.num_curr_seqs == 1
- assert len(remaining_waiting) == 0
- def test_prefill_schedule_max_seqs():
- """
- Test max seq respected.
- """
- scheduler = initialize_scheduler()
- budget = create_token_budget(max_num_seqs=2)
- for i in range(3):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler.add_seq_group(seq_group)
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 0
- assert len(output.seq_groups) == 2
- assert budget.num_batched_tokens == 120
- assert budget.num_curr_seqs == 2
- assert len(remaining_waiting) == 1
- # Verify curr_num_seqs respected.
- scheduler.waiting = deque()
- budget = create_token_budget(max_num_seqs=2)
- add_token_budget(budget, 0, 2)
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler.add_seq_group(seq_group)
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 0
- assert len(output.seq_groups) == 0
- assert budget.num_batched_tokens == 0
- assert budget.num_curr_seqs == 2
- assert len(remaining_waiting) == 1
- def test_prefill_schedule_max_lora():
- """
- Test max lora is respected and prioritized.
- """
- lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
- scheduler = initialize_scheduler(lora_config=lora_config)
- budget = create_token_budget(token_budget=120)
- curr_loras: Set[int] = set()
- for i in range(2):
- _, seq_group = create_dummy_prompt(str(i),
- prompt_length=60,
- lora_request=LoRARequest(
- lora_name=str(i),
- lora_int_id=i + 1,
- lora_path="abc"))
- scheduler.add_seq_group(seq_group)
- # Add two more requests to verify lora is prioritized.
- # 0: Lora, 1: Lora, 2: regular, 3: regular
- # In the first iteration, index 0, 2 is scheduled.
- # If a request is not scheduled because it hits max lora, it is
- # prioritized. Verify that.
- for i in range(2, 4):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler.add_seq_group(seq_group)
- # Schedule 2 requests (0 and 2)
- output = scheduler._schedule_prefills(budget, curr_loras)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 0
- assert len(output.seq_groups) == 2
- assert budget.num_batched_tokens == 120
- assert budget.num_curr_seqs == 2
- assert len(remaining_waiting) == 2
- assert len(curr_loras) == 1
- # The second lora request is scheduled next as FCFS policy.
- # Reset curr_loras so that it can be scheduled.
- curr_loras = set()
- budget = create_token_budget(token_budget=60)
- output = scheduler._schedule_prefills(budget, curr_loras)
- remaining_waiting = scheduler.waiting
- assert len(output.seq_groups) == 1
- assert output.seq_groups[0].seq_group.request_id == "1"
- assert len(remaining_waiting) == 1
- assert len(curr_loras) == 1
- assert budget.num_batched_tokens == 60
- def test_prefill_schedule_no_block_manager_capacity():
- """
- Test sequence cannot be scheduled due to block manager has no capacity.
- """
- scheduler = initialize_scheduler()
- budget = create_token_budget()
- for i in range(3):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler.add_seq_group(seq_group)
- scheduler.block_manager.can_allocate = MagicMock()
- scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 0
- assert len(output.seq_groups) == 0
- assert budget.num_batched_tokens == 0
- assert budget.num_curr_seqs == 0
- assert len(remaining_waiting) == 3
- scheduler = initialize_scheduler()
- budget = create_token_budget()
- for i in range(3):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler.add_seq_group(seq_group)
- scheduler.block_manager.can_allocate = MagicMock()
- scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
- output = scheduler._schedule_prefills(budget, None)
- remaining_waiting = scheduler.waiting
- assert len(output.ignored_seq_groups) == 3
- assert len(output.seq_groups) == 0
- assert budget.num_batched_tokens == 0
- assert budget.num_curr_seqs == 0
- assert len(remaining_waiting) == 0
- def test_decode_schedule_preempted():
- """
- Test decodes cannot be scheduled and preempted.
- """
- scheduler = initialize_scheduler()
- curr_loras = None
- for i in range(3):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._add_seq_group_to_running(seq_group)
- scheduler.block_manager.can_append_slots = MagicMock()
- def cannot_append_second_group(seq_group, num_lookahead_slots):
- return seq_group.request_id != "1"
- scheduler.block_manager.can_append_slots.side_effect = (
- cannot_append_second_group)
- # 1 cannot be scheduled, and the lowest priority (request 2)
- # should be preempted. 1 will also be preempted.
- budget = create_token_budget()
- output = scheduler._schedule_running(budget, curr_loras)
- remainig_running = scheduler.running
- assert len(remainig_running) == 0
- assert len(output.decode_seq_groups) == 1
- assert len(output.prefill_seq_groups) == 0
- assert output.decode_seq_groups[0].seq_group.request_id == "0"
- assert len(output.preempted) == 2
- # Verify budgets are updated.
- assert budget.num_batched_tokens == 1
- # NOTE: When enable_chunk is False, num_seqs budget is not updated.
- # assert budget.num_curr_seqs == 1
- # Both should be preempted, not swapped.
- assert output.blocks_to_swap_out == []
- # Nothing is copied.
- assert output.blocks_to_copy == []
- def test_decode_swap_beam_search():
- """
- Test best_of > 1 swap out blocks
- """
- scheduler = initialize_scheduler()
- curr_loras = None
- budget = create_token_budget()
- for i in range(3):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
- scheduler._allocate_and_set_running(seq_group)
- scheduler._add_seq_group_to_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- budget.add_num_seqs(seq_group.request_id,
- seq_group.get_max_num_running_seqs())
- budget.add_num_batched_tokens(
- seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
- # The last request should be swapped out.
- scheduler.block_manager.can_append_slots = MagicMock()
- def cannot_append_second_group(seq_group, num_lookahead_slots):
- return seq_group.request_id != "2"
- scheduler.block_manager.can_append_slots.side_effect = (
- cannot_append_second_group)
- scheduler.block_manager.swap_out = MagicMock()
- expected_swap_mapping = [("5", "7")]
- scheduler.block_manager.swap_out.return_value = expected_swap_mapping
- output = scheduler._schedule_running(budget, curr_loras)
- remainig_running = scheduler.running
- assert len(remainig_running) == 0
- assert len(output.decode_seq_groups) == 2
- assert len(output.prefill_seq_groups) == 0
- assert output.decode_seq_groups[0].seq_group.request_id == "0"
- assert output.decode_seq_groups[1].seq_group.request_id == "1"
- assert len(output.preempted) == 0
- assert len(output.swapped_out) == 1
- # Budget should refledct preempted requests.
- assert budget.num_batched_tokens == 2
- # since there are 2 sequences, 2 should be subtracted.
- assert budget.num_curr_seqs == 4
- # Both should be preempted, not swapped.
- assert output.blocks_to_swap_out == expected_swap_mapping
- # Nothing is copied.
- assert output.blocks_to_copy == []
- def test_schedule_decode_blocks_to_copy_update():
- """
- Verify blocks_to_copy is updated.
- """
- scheduler = initialize_scheduler()
- _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
- curr_loras = None
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._add_seq_group_to_running(seq_group)
- # The last request should be swapped out.
- scheduler.block_manager.append_slots = MagicMock()
- scheduler.block_manager.append_slots.return_value = [(2, 3)]
- budget = create_token_budget()
- output = scheduler._schedule_running(budget, curr_loras)
- remaining_running = scheduler.running
- assert len(remaining_running) == 0
- assert len(output.decode_seq_groups) == 1
- assert len(output.prefill_seq_groups) == 0
- assert len(output.preempted) == 0
- assert len(output.swapped_out) == 0
- # Nothing is preempted.
- assert output.blocks_to_swap_out == []
- # Since append_slot returns the source -> dist mapping, it should
- # applied.
- assert output.blocks_to_copy == [(2, 3)]
- def test_schedule_swapped_simple():
- scheduler = initialize_scheduler()
- curr_loras = None
- blocks_to_swap_out: List[Tuple[int, int]] = []
- _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._swap_out(seq_group, blocks_to_swap_out)
- scheduler._add_seq_group_to_swapped(seq_group)
- budget = create_token_budget()
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 0
- assert budget.num_batched_tokens == 1
- assert budget.num_curr_seqs == 2
- assert len(output.decode_seq_groups) == 1
- assert len(output.prefill_seq_groups) == 0
- # swap in is the reverse of swap out
- blocks_to_swap_in_reverse = []
- for swapin, swapout in output.blocks_to_swap_in:
- blocks_to_swap_in_reverse.append((swapout, swapin))
- assert blocks_to_swap_out == blocks_to_swap_in_reverse
- def test_schedule_swapped_max_token_budget():
- scheduler = initialize_scheduler()
- curr_loras = None
- blocks_to_swap_out: List[Tuple[int, int]] = []
- for _ in range(2):
- _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._swap_out(seq_group, blocks_to_swap_out)
- scheduler._add_seq_group_to_swapped(seq_group)
- budget = create_token_budget(token_budget=1)
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 1
- assert budget.num_batched_tokens == 1
- assert budget.num_curr_seqs == 2
- assert len(output.decode_seq_groups) == 1
- assert len(output.prefill_seq_groups) == 0
- # Verify num_batched_tokens are respected.
- budget = create_token_budget(token_budget=1)
- add_token_budget(budget, 1, 0)
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 1
- assert budget.num_batched_tokens == 1
- assert budget.num_curr_seqs == 0
- assert len(output.decode_seq_groups) == 0
- assert len(output.prefill_seq_groups) == 0
- def test_schedule_swapped_max_seqs():
- scheduler = initialize_scheduler()
- curr_loras = None
- blocks_to_swap_out: List[Tuple[int, int]] = []
- for i in range(4):
- _, seq_group = create_dummy_prompt(str(i), prompt_length=60)
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._swap_out(seq_group, blocks_to_swap_out)
- scheduler._add_seq_group_to_swapped(seq_group)
- budget = create_token_budget(max_num_seqs=2)
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 2
- assert budget.num_batched_tokens == 2
- assert budget.num_curr_seqs == 2
- assert len(output.decode_seq_groups) == 2
- assert len(output.prefill_seq_groups) == 0
- # Verify num_curr_seqs are respected.
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 2
- assert budget.num_batched_tokens == 2
- assert budget.num_curr_seqs == 2
- assert len(output.decode_seq_groups) == 0
- assert len(output.prefill_seq_groups) == 0
- def test_schedule_swapped_max_loras():
- lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
- scheduler = initialize_scheduler(lora_config=lora_config)
- curr_loras: Set[int] = set()
- blocks_to_swap_out: List[Tuple[int, int]] = []
- for i in range(2):
- _, seq_group = create_dummy_prompt(str(i),
- prompt_length=60,
- lora_request=LoRARequest(
- lora_name=str(i),
- lora_int_id=i + 1,
- lora_path="abc"))
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._swap_out(seq_group, blocks_to_swap_out)
- scheduler._add_seq_group_to_swapped(seq_group)
- budget = create_token_budget()
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 1
- assert budget.num_batched_tokens == 1
- assert budget.num_curr_seqs == 1
- assert len(output.decode_seq_groups) == 1
- assert len(output.prefill_seq_groups) == 0
- assert len(curr_loras) == 1
- def test_schedule_swapped_cannot_swap_in():
- scheduler = initialize_scheduler()
- curr_loras = None
- blocks_to_swap_out: List[Tuple[int, int]] = []
- for _ in range(2):
- _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._swap_out(seq_group, blocks_to_swap_out)
- scheduler._add_seq_group_to_swapped(seq_group)
- # The last request should be swapped out.
- scheduler.block_manager.can_swap_in = MagicMock()
- scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
- # Since we cannot swap in, none of the requests are swapped in.
- budget = create_token_budget()
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 2
- assert budget.num_batched_tokens == 0
- assert budget.num_curr_seqs == 0
- assert len(output.decode_seq_groups) == 0
- assert len(output.prefill_seq_groups) == 0
- def test_infeasible_swap():
- scheduler = initialize_scheduler()
- curr_loras = None
- blocks_to_swap_out: List[Tuple[int, int]] = []
- for _ in range(2):
- _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- scheduler._swap_out(seq_group, blocks_to_swap_out)
- scheduler._add_seq_group_to_swapped(seq_group)
- # The last request should be swapped out.
- scheduler.block_manager.can_swap_in = MagicMock()
- scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
- # Since we cannot swap in, none of the requests are swapped in.
- budget = create_token_budget()
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 0
- assert len(output.infeasible_seq_groups) == 2
- assert budget.num_batched_tokens == 0
- assert budget.num_curr_seqs == 0
- assert len(output.decode_seq_groups) == 0
- assert len(output.prefill_seq_groups) == 0
- def test_schedule_swapped_blocks_to_copy():
- scheduler = initialize_scheduler()
- curr_loras = None
- _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
- scheduler._allocate_and_set_running(seq_group)
- append_new_token_seq_group(60, seq_group, 1)
- blocks_to_swap_out: List[Tuple[int, int]] = []
- scheduler._swap_out(seq_group, blocks_to_swap_out)
- scheduler._add_seq_group_to_swapped(seq_group)
- # The last request should be swapped out.
- scheduler.block_manager.append_slots = MagicMock()
- scheduler.block_manager.append_slots.return_value = [(2, 3)]
- budget = create_token_budget()
- output = scheduler._schedule_swapped(budget, curr_loras)
- remaining_swapped = scheduler.swapped
- assert len(remaining_swapped) == 0
- assert len(output.decode_seq_groups) == 1
- assert len(output.prefill_seq_groups) == 0
- assert output.blocks_to_copy == [(2, 3)]
- def test_scheduling_budget():
- TOKEN_BUDGET = 4
- MAX_SEQS = 4
- budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
- assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
- assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
- assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
- assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
- assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
- assert budget.remaining_token_budget() == TOKEN_BUDGET
- # Verify add/subtract num batched tokens.
- _, seq_group = create_dummy_prompt("1", 3)
- budget.add_num_batched_tokens(seq_group.request_id, 2)
- assert budget.remaining_token_budget() == 2
- assert budget.num_batched_tokens == 2
- assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
- assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
- # Verify adding another seq group is no-op.
- budget.add_num_batched_tokens(seq_group.request_id, 2)
- assert budget.remaining_token_budget() == 2
- assert budget.num_batched_tokens == 2
- budget.subtract_num_batched_tokens(seq_group.request_id, 2)
- assert budget.remaining_token_budget() == 4
- assert budget.num_batched_tokens == 0
- budget.subtract_num_batched_tokens(seq_group.request_id, 2)
- assert budget.remaining_token_budget() == 4
- assert budget.num_batched_tokens == 0
- # Verify add/subtract max seqs.
- _, seq_group = create_dummy_prompt("1", 3)
- budget.add_num_seqs(seq_group.request_id, 2)
- assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
- assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
- assert budget.num_curr_seqs == 2
- # Verify adding another seq group is no-op.
- budget.add_num_seqs(seq_group.request_id, 2)
- assert budget.num_curr_seqs == 2
- budget.subtract_num_seqs(seq_group.request_id, 2)
- assert budget.num_curr_seqs == 0
- budget.subtract_num_seqs(seq_group.request_id, 2)
- assert budget.num_curr_seqs == 0
|