1
0

test_scheduler.py 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023
  1. import time
  2. from collections import deque
  3. from typing import List, Set, Tuple
  4. from unittest.mock import MagicMock
  5. import pytest
  6. from torch import Use # noqa
  7. from aphrodite.common.config import CacheConfig, LoRAConfig, SchedulerConfig
  8. from aphrodite.common.sequence import SequenceGroup, SequenceStatus
  9. from aphrodite.lora.request import LoRARequest
  10. from aphrodite.processing.interfaces import AllocStatus
  11. from aphrodite.processing.scheduler import Scheduler, SchedulingBudget
  12. from .utils import (append_new_token, append_new_token_seq_group,
  13. create_dummy_prompt, get_sequence_groups,
  14. schedule_and_update_computed_tokens)
  15. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  16. def test_scheduler_add_seq_group(use_v2_block_manager: bool):
  17. block_size = 4
  18. scheduler_config = SchedulerConfig(
  19. 100, 64, 1, use_v2_block_manager=use_v2_block_manager)
  20. cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto",)
  21. cache_config.num_cpu_blocks = 4
  22. cache_config.num_gpu_blocks = 4
  23. scheduler = Scheduler(scheduler_config, cache_config, None)
  24. # Add seq group to scheduler.
  25. num_seq_group = 4
  26. for i in range(num_seq_group):
  27. _, seq_group = create_dummy_prompt(str(i),
  28. block_size,
  29. block_size=block_size)
  30. scheduler.add_seq_group(seq_group)
  31. assert scheduler.get_num_unfinished_seq_groups() == i + 1
  32. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  33. def test_scheduler_abort_seq_group(use_v2_block_manager: bool):
  34. block_size = 4
  35. scheduler_config = SchedulerConfig(
  36. 100, 64, 1, use_v2_block_manager=use_v2_block_manager)
  37. cache_config = CacheConfig(block_size, 1.0, 1, "auto")
  38. cache_config.num_cpu_blocks = 4
  39. cache_config.num_gpu_blocks = 4
  40. scheduler = Scheduler(scheduler_config, cache_config, None)
  41. # Add multiple seq groups to scheduler.
  42. num_seq_group = 4
  43. request_ids: Set[str] = set()
  44. for i in range(num_seq_group):
  45. _, seq_group = create_dummy_prompt(str(i), block_size)
  46. scheduler.add_seq_group(seq_group)
  47. request_ids.add(str(i))
  48. # Abort all added seq groups.
  49. assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
  50. scheduler.abort_seq_group(request_ids)
  51. assert scheduler.get_num_unfinished_seq_groups() == 0
  52. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  53. def test_scheduler_schedule_simple(use_v2_block_manager: bool):
  54. block_size = 4
  55. num_seq_group = 4
  56. max_model_len = 16
  57. scheduler_config = SchedulerConfig(
  58. 64,
  59. num_seq_group,
  60. max_model_len,
  61. use_v2_block_manager=use_v2_block_manager)
  62. cache_config = CacheConfig(block_size, 1.0, 1, "auto")
  63. cache_config.num_cpu_blocks = 8
  64. cache_config.num_gpu_blocks = 8
  65. scheduler = Scheduler(scheduler_config, cache_config, None)
  66. running: List[SequenceGroup] = []
  67. # Add seq groups to scheduler.
  68. for i in range(num_seq_group):
  69. _, seq_group = create_dummy_prompt(str(i),
  70. prompt_length=block_size,
  71. block_size=block_size)
  72. scheduler.add_seq_group(seq_group)
  73. running.append(seq_group)
  74. # Schedule seq groups prompts.
  75. num_tokens = block_size * num_seq_group
  76. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  77. assert set(get_sequence_groups(out)) == set(running)
  78. assert out.num_batched_tokens == num_tokens
  79. assert (not out.blocks_to_copy and not out.blocks_to_swap_in
  80. and not out.blocks_to_swap_out)
  81. assert len(seq_group_meta) == num_seq_group
  82. append_new_token(out, 1)
  83. # Schedule seq groups generation.
  84. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  85. assert set(get_sequence_groups(out)) == set(running)
  86. assert out.num_batched_tokens == num_seq_group
  87. assert (not out.blocks_to_copy and not out.blocks_to_swap_in
  88. and not out.blocks_to_swap_out)
  89. assert len(seq_group_meta) == num_seq_group
  90. append_new_token(out, 1)
  91. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  92. def test_scheduler_prefill_prioritized(use_v2_block_manager: bool):
  93. """Verify running batched tokens are not applied to prefill requests."""
  94. block_size = 4
  95. max_model_len = 30
  96. max_batched_num_tokens = 30
  97. scheduler_config = SchedulerConfig(
  98. max_batched_num_tokens,
  99. 2,
  100. max_model_len,
  101. use_v2_block_manager=use_v2_block_manager)
  102. cache_config = CacheConfig(block_size, 1.0, 1, "auto")
  103. cache_config.num_cpu_blocks = 16
  104. cache_config.num_gpu_blocks = 16
  105. scheduler = Scheduler(scheduler_config, cache_config, None)
  106. # Add seq groups to scheduler.
  107. _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
  108. scheduler.add_seq_group(seq_group_a)
  109. # Schedule seq groups prompts.
  110. _, out = schedule_and_update_computed_tokens(scheduler)
  111. assert get_sequence_groups(out) == [seq_group_a]
  112. # Add a new prefill request B.
  113. _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
  114. scheduler.add_seq_group(seq_group_b)
  115. # Verify prefill requests are prioritized. Since max_batched_num_tokens
  116. # is 1, new prefill request has to be scheduled first.
  117. _, out = schedule_and_update_computed_tokens(scheduler)
  118. assert get_sequence_groups(out) == [seq_group_b]
  119. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  120. def test_scheduler_schedule_preempt_abort(use_v2_block_manager: bool):
  121. block_size = 4
  122. max_model_len = 16
  123. scheduler_config = SchedulerConfig(
  124. 64, 2, max_model_len, use_v2_block_manager=use_v2_block_manager)
  125. cache_config = CacheConfig(block_size, 1.0, 1, "auto")
  126. cache_config.num_cpu_blocks = 2
  127. cache_config.num_gpu_blocks = 2
  128. scheduler = Scheduler(scheduler_config, cache_config, None)
  129. # Add seq groups to scheduler.
  130. seq_a, seq_group_a = create_dummy_prompt("1",
  131. block_size,
  132. block_size=block_size)
  133. seq_b, seq_group_b = create_dummy_prompt("2",
  134. block_size,
  135. block_size=block_size)
  136. scheduler.add_seq_group(seq_group_a)
  137. scheduler.add_seq_group(seq_group_b)
  138. # Schedule seq groups prompts.
  139. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  140. assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
  141. assert out.num_batched_tokens == block_size * 2 # seq_a and seq_b
  142. assert (not out.blocks_to_copy and not out.blocks_to_swap_in
  143. and not out.blocks_to_swap_out)
  144. assert len(seq_group_meta) == 2
  145. assert scheduler.get_num_unfinished_seq_groups() == 2
  146. # Append "generated" tokens, allowing the sequence to mark prompt tokens as
  147. # processed.
  148. append_new_token(out, 1)
  149. # Schedule seq groups generation and preempt seq group b.
  150. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  151. assert get_sequence_groups(out) == [seq_group_a]
  152. assert out.num_batched_tokens == 1
  153. assert (not out.blocks_to_copy and not out.blocks_to_swap_in
  154. and not out.blocks_to_swap_out)
  155. assert len(seq_group_meta) == 1
  156. assert scheduler.get_num_unfinished_seq_groups() == 2
  157. assert out.preempted == 1
  158. # Abort seq group a. Re-schedule seq group b prompt with recomputation.
  159. scheduler.abort_seq_group("1")
  160. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  161. assert get_sequence_groups(out) == [seq_group_b]
  162. assert out.num_batched_tokens == 5 # 4 prompt + 1 generation.
  163. assert (not out.blocks_to_copy and not out.blocks_to_swap_in
  164. and not out.blocks_to_swap_out)
  165. assert len(seq_group_meta) == 1
  166. assert scheduler.get_num_unfinished_seq_groups() == 1
  167. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  168. def test_scheduler_max_seqs(use_v2_block_manager: bool):
  169. block_size = 4
  170. num_seq_group = 4
  171. max_seq_group = 2
  172. max_model_len = 16
  173. scheduler_config = SchedulerConfig(
  174. 64,
  175. max_seq_group,
  176. max_model_len,
  177. use_v2_block_manager=use_v2_block_manager)
  178. cache_config = CacheConfig(block_size, 1.0, 1, "auto")
  179. cache_config.num_cpu_blocks = 8
  180. cache_config.num_gpu_blocks = 8
  181. scheduler = Scheduler(scheduler_config, cache_config, None)
  182. all_seq_groups: List[SequenceGroup] = []
  183. # Add seq groups to scheduler.
  184. for i in range(num_seq_group):
  185. _, seq_group = create_dummy_prompt(str(i),
  186. prompt_length=block_size,
  187. block_size=block_size)
  188. all_seq_groups.append(seq_group)
  189. # Append 1 seq group
  190. scheduler.add_seq_group(all_seq_groups[0])
  191. # Schedule seq groups prompts.
  192. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  193. assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
  194. append_new_token(out, 1)
  195. # Schedule seq groups generation.
  196. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  197. assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
  198. append_new_token(out, 1)
  199. # Append 2 more seq group
  200. scheduler.add_seq_group(all_seq_groups[1])
  201. scheduler.add_seq_group(all_seq_groups[2])
  202. # Schedule seq groups prompts.
  203. # Only 1 seq group should be scheduled since max_seq_group is 2
  204. # and one is prompting.
  205. _, out = schedule_and_update_computed_tokens(scheduler)
  206. assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
  207. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  208. def test_scheduler_delay_factor(use_v2_block_manager: bool):
  209. block_size = 4
  210. scheduler_config = SchedulerConfig(
  211. 100,
  212. 64,
  213. 16,
  214. delay_factor=0.5,
  215. use_v2_block_manager=use_v2_block_manager)
  216. cache_config = CacheConfig(block_size, 1.0, 1, "auto")
  217. cache_config.num_cpu_blocks = 8
  218. cache_config.num_gpu_blocks = 8
  219. scheduler = Scheduler(scheduler_config, cache_config, None)
  220. # schedule first prompt
  221. seq_group_meta, seq_group = create_dummy_prompt("0",
  222. prompt_length=block_size,
  223. block_size=block_size)
  224. scheduler.add_seq_group(seq_group)
  225. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  226. assert out.num_prefill_groups > 0
  227. assert seq_group_meta[0].request_id == '0'
  228. append_new_token(out, 1)
  229. # wait for a second before scheduling next prompt
  230. time.sleep(1)
  231. seq_group_meta, seq_group = create_dummy_prompt("1",
  232. prompt_length=block_size,
  233. block_size=block_size)
  234. scheduler.add_seq_group(seq_group)
  235. # second prompt should *not* be scheduled
  236. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  237. assert out.num_prefill_groups == 0
  238. assert seq_group_meta[0].request_id == '0'
  239. append_new_token(out, 1)
  240. # wait for more than 0.5 second and try again
  241. time.sleep(0.6)
  242. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  243. assert out.num_prefill_groups > 0
  244. assert seq_group_meta[0].request_id == '1'
  245. append_new_token(out, 1)
  246. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  247. def test_swapped_out_prioritized(use_v2_block_manager: bool):
  248. block_size = 4
  249. scheduler = initialize_scheduler(max_num_seqs=6,
  250. block_size=block_size,
  251. use_v2_block_manager=use_v2_block_manager,
  252. num_cpu_blocks=64,
  253. num_gpu_blocks=64)
  254. # best_of=2 * 3 == 6 sequences.
  255. for i in range(3):
  256. _, seq_group = create_dummy_prompt(str(i),
  257. prompt_length=60,
  258. best_of=2,
  259. block_size=block_size)
  260. scheduler.add_seq_group(seq_group)
  261. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  262. # prefill scheduled now.
  263. assert len(out.scheduled_seq_groups) == 3
  264. append_new_token(out, 1)
  265. # The last request should be swapped out.
  266. scheduler.block_manager.can_append_slots = MagicMock()
  267. def cannot_append_second_group(seq_group, num_lookahead_slots):
  268. return seq_group.request_id != "2"
  269. scheduler.block_manager.can_append_slots.side_effect = (
  270. cannot_append_second_group)
  271. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  272. assert len(out.scheduled_seq_groups) == 2
  273. assert out.num_batched_tokens == 2
  274. assert out.blocks_to_swap_out != []
  275. assert out.blocks_to_swap_in == []
  276. append_new_token(out, 1)
  277. # Add 1 more task. Swap should be prioritized over prefill.
  278. _, seq_group = create_dummy_prompt(str(i),
  279. prompt_length=60,
  280. best_of=2,
  281. block_size=block_size)
  282. scheduler.add_seq_group(seq_group)
  283. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
  284. append_new_token(out, 1)
  285. assert len(out.scheduled_seq_groups) == 3
  286. # 3 decodes. It is swapped in.
  287. assert out.num_batched_tokens == 3
  288. assert out.blocks_to_swap_in != []
  289. assert out.blocks_to_swap_out == []
  290. def initialize_scheduler(
  291. *,
  292. max_num_seqs=1000,
  293. max_token_budget=1000,
  294. max_model_len=1000,
  295. lora_config=None,
  296. use_v2_block_manager=False,
  297. block_size=4,
  298. num_cpu_blocks=8,
  299. num_gpu_blocks=8,
  300. ):
  301. block_size = block_size
  302. scheduler_config = SchedulerConfig(
  303. max_token_budget,
  304. max_num_seqs,
  305. max_model_len,
  306. use_v2_block_manager=use_v2_block_manager)
  307. cache_config = CacheConfig(block_size, 1.0, 1, "auto")
  308. cache_config.num_cpu_blocks = num_cpu_blocks
  309. cache_config.num_gpu_blocks = num_gpu_blocks
  310. scheduler = Scheduler(scheduler_config, cache_config, lora_config)
  311. return scheduler
  312. def create_token_budget(token_budget: int = 10000,
  313. max_num_seqs: int = 10000) -> SchedulingBudget:
  314. return SchedulingBudget(
  315. token_budget=token_budget,
  316. max_num_seqs=max_num_seqs,
  317. )
  318. def add_token_budget(budget: SchedulingBudget,
  319. num_batched_tokens: int = 0,
  320. num_curr_seqs: int = 0):
  321. mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
  322. budget.add_num_batched_tokens(mock_seq_group.request_id,
  323. num_batched_tokens)
  324. budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)
  325. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  326. def test_prefill_schedule_max_prompt_len(use_v2_block_manager: bool):
  327. """
  328. Test prompt longer than max_prompt_len is aborted.
  329. """
  330. block_size = 4
  331. scheduler = initialize_scheduler(max_model_len=30,
  332. use_v2_block_manager=use_v2_block_manager,
  333. block_size=block_size)
  334. _, seq_group = create_dummy_prompt("0",
  335. prompt_length=60,
  336. block_size=block_size)
  337. scheduler.add_seq_group(seq_group)
  338. budget = create_token_budget()
  339. output = scheduler._schedule_prefills(budget, None)
  340. remaining_waiting = scheduler.waiting
  341. assert len(output.ignored_seq_groups) == 1
  342. assert len(output.seq_groups) == 0
  343. assert budget.num_batched_tokens == 0
  344. assert budget.num_curr_seqs == 0
  345. assert len(remaining_waiting) == 0
  346. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  347. def test_prefill_schedule_token_budget(use_v2_block_manager: bool):
  348. """
  349. Test token budget respected.
  350. """
  351. block_size = 4
  352. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  353. block_size=block_size,
  354. num_cpu_blocks=64,
  355. num_gpu_blocks=64)
  356. budget = create_token_budget(token_budget=0)
  357. for i in range(2):
  358. _, seq_group = create_dummy_prompt(str(i),
  359. prompt_length=60,
  360. block_size=block_size)
  361. scheduler.add_seq_group(seq_group)
  362. # 0 token budget == nothing is scheduled.
  363. output = scheduler._schedule_prefills(budget, None)
  364. remaining_waiting = scheduler.waiting
  365. assert len(output.ignored_seq_groups) == 0
  366. assert len(output.seq_groups) == 0
  367. assert budget.num_batched_tokens == 0
  368. assert budget.num_curr_seqs == 0
  369. assert len(remaining_waiting) == 2
  370. # 60 token budget == 1 request scheduled.
  371. budget = create_token_budget(token_budget=60)
  372. output = scheduler._schedule_prefills(budget, None)
  373. remaining_waiting = scheduler.waiting
  374. assert len(output.ignored_seq_groups) == 0
  375. assert len(output.seq_groups) == 1
  376. assert budget.num_batched_tokens == 60
  377. assert budget.num_curr_seqs == 1
  378. assert len(remaining_waiting) == 1
  379. # Test when current_batched_tokens respected.
  380. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  381. block_size=block_size,
  382. num_cpu_blocks=16,
  383. num_gpu_blocks=16)
  384. budget = create_token_budget(token_budget=60)
  385. add_token_budget(budget, 30, 0)
  386. _, seq_group = create_dummy_prompt(str(i),
  387. prompt_length=60,
  388. block_size=block_size)
  389. # Cannot schedule a prompt that doesn't fit the budget.
  390. scheduler.add_seq_group(seq_group)
  391. output = scheduler._schedule_prefills(budget, None)
  392. remaining_waiting = scheduler.waiting
  393. assert len(output.ignored_seq_groups) == 0
  394. assert len(output.seq_groups) == 0
  395. assert budget.num_batched_tokens == 30
  396. assert budget.num_curr_seqs == 0
  397. assert len(remaining_waiting) == 1
  398. budget = create_token_budget(token_budget=90)
  399. add_token_budget(budget, 30, 0)
  400. output = scheduler._schedule_prefills(budget, None)
  401. remaining_waiting = scheduler.waiting
  402. assert len(output.seq_groups) == 1
  403. assert budget.num_batched_tokens == 90
  404. assert budget.num_curr_seqs == 1
  405. assert len(remaining_waiting) == 0
  406. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  407. def test_prefill_schedule_max_seqs(use_v2_block_manager: bool):
  408. """
  409. Test max seq respected.
  410. """
  411. block_size = 4
  412. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  413. block_size=block_size,
  414. num_cpu_blocks=64,
  415. num_gpu_blocks=64)
  416. budget = create_token_budget(max_num_seqs=2)
  417. for i in range(3):
  418. _, seq_group = create_dummy_prompt(str(i),
  419. prompt_length=60,
  420. block_size=block_size)
  421. scheduler.add_seq_group(seq_group)
  422. output = scheduler._schedule_prefills(budget, None)
  423. remaining_waiting = scheduler.waiting
  424. assert len(output.ignored_seq_groups) == 0
  425. assert len(output.seq_groups) == 2
  426. assert budget.num_batched_tokens == 120
  427. assert budget.num_curr_seqs == 2
  428. assert len(remaining_waiting) == 1
  429. # Verify curr_num_seqs respected.
  430. scheduler.waiting = deque()
  431. budget = create_token_budget(max_num_seqs=2)
  432. add_token_budget(budget, 0, 2)
  433. _, seq_group = create_dummy_prompt(str(i),
  434. prompt_length=60,
  435. block_size=block_size)
  436. scheduler.add_seq_group(seq_group)
  437. output = scheduler._schedule_prefills(budget, None)
  438. remaining_waiting = scheduler.waiting
  439. assert len(output.ignored_seq_groups) == 0
  440. assert len(output.seq_groups) == 0
  441. assert budget.num_batched_tokens == 0
  442. assert budget.num_curr_seqs == 2
  443. assert len(remaining_waiting) == 1
  444. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  445. def test_prefill_schedule_max_lora(use_v2_block_manager: bool):
  446. """
  447. Test max lora is respected and prioritized.
  448. """
  449. block_size = 4
  450. lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
  451. scheduler = initialize_scheduler(lora_config=lora_config,
  452. use_v2_block_manager=use_v2_block_manager,
  453. block_size=block_size,
  454. num_cpu_blocks=64,
  455. num_gpu_blocks=64)
  456. budget = create_token_budget(token_budget=120)
  457. curr_loras: Set[int] = set()
  458. for i in range(2):
  459. _, seq_group = create_dummy_prompt(str(i),
  460. prompt_length=60,
  461. block_size=block_size,
  462. lora_request=LoRARequest(
  463. lora_name=str(i),
  464. lora_int_id=i + 1,
  465. lora_path="abc"))
  466. scheduler.add_seq_group(seq_group)
  467. # Add two more requests to verify lora is prioritized.
  468. # 0: Lora, 1: Lora, 2: regular, 3: regular
  469. # In the first iteration, index 0, 2 is scheduled.
  470. # If a request is not scheduled because it hits max lora, it is
  471. # prioritized. Verify that.
  472. for i in range(2, 4):
  473. _, seq_group = create_dummy_prompt(str(i),
  474. prompt_length=60,
  475. block_size=block_size)
  476. scheduler.add_seq_group(seq_group)
  477. # Schedule 2 requests (0 and 2)
  478. output = scheduler._schedule_prefills(budget, curr_loras)
  479. remaining_waiting = scheduler.waiting
  480. assert len(output.ignored_seq_groups) == 0
  481. assert len(output.seq_groups) == 2
  482. assert budget.num_batched_tokens == 120
  483. assert budget.num_curr_seqs == 2
  484. assert len(remaining_waiting) == 2
  485. assert len(curr_loras) == 1
  486. # The second lora request is scheduled next as FCFS policy.
  487. # Reset curr_loras so that it can be scheduled.
  488. curr_loras = set()
  489. budget = create_token_budget(token_budget=60)
  490. output = scheduler._schedule_prefills(budget, curr_loras)
  491. remaining_waiting = scheduler.waiting
  492. assert len(output.seq_groups) == 1
  493. assert output.seq_groups[0].seq_group.request_id == "1"
  494. assert len(remaining_waiting) == 1
  495. assert len(curr_loras) == 1
  496. assert budget.num_batched_tokens == 60
  497. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  498. def test_prefill_schedule_no_block_manager_capacity(use_v2_block_manager):
  499. """
  500. Test sequence cannot be scheduled due to block manager has no capacity.
  501. """
  502. block_size = 4
  503. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  504. block_size=block_size,
  505. num_gpu_blocks=128,
  506. num_cpu_blocks=128)
  507. budget = create_token_budget()
  508. for i in range(3):
  509. _, seq_group = create_dummy_prompt(str(i),
  510. prompt_length=60,
  511. block_size=block_size)
  512. scheduler.add_seq_group(seq_group)
  513. scheduler.block_manager.can_allocate = MagicMock()
  514. scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
  515. output = scheduler._schedule_prefills(budget, None)
  516. remaining_waiting = scheduler.waiting
  517. assert len(output.ignored_seq_groups) == 0
  518. assert len(output.seq_groups) == 0
  519. assert budget.num_batched_tokens == 0
  520. assert budget.num_curr_seqs == 0
  521. assert len(remaining_waiting) == 3
  522. scheduler = initialize_scheduler()
  523. budget = create_token_budget()
  524. for i in range(3):
  525. _, seq_group = create_dummy_prompt(str(i),
  526. prompt_length=60,
  527. block_size=block_size)
  528. scheduler.add_seq_group(seq_group)
  529. scheduler.block_manager.can_allocate = MagicMock()
  530. scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
  531. output = scheduler._schedule_prefills(budget, None)
  532. remaining_waiting = scheduler.waiting
  533. assert len(output.ignored_seq_groups) == 3
  534. assert len(output.seq_groups) == 0
  535. assert budget.num_batched_tokens == 0
  536. assert budget.num_curr_seqs == 0
  537. assert len(remaining_waiting) == 0
  538. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  539. def test_decode_schedule_preempted(use_v2_block_manager: bool):
  540. """
  541. Test decodes cannot be scheduled and preempted.
  542. """
  543. block_size = 4
  544. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  545. block_size=block_size,
  546. num_cpu_blocks=64,
  547. num_gpu_blocks=64)
  548. curr_loras = None
  549. for i in range(3):
  550. _, seq_group = create_dummy_prompt(str(i),
  551. prompt_length=60,
  552. block_size=block_size)
  553. scheduler._allocate_and_set_running(seq_group)
  554. append_new_token_seq_group(60, seq_group, 1)
  555. scheduler._add_seq_group_to_running(seq_group)
  556. scheduler.block_manager.can_append_slots = MagicMock()
  557. def cannot_append_second_group(seq_group, num_lookahead_slots):
  558. return seq_group.request_id != "1"
  559. scheduler.block_manager.can_append_slots.side_effect = (
  560. cannot_append_second_group)
  561. # 1 cannot be scheduled, and the lowest priority (request 2)
  562. # should be preempted. 1 will also be preempted.
  563. budget = create_token_budget()
  564. output = scheduler._schedule_running(budget, curr_loras)
  565. remainig_running = scheduler.running
  566. assert len(remainig_running) == 0
  567. assert len(output.decode_seq_groups) == 1
  568. assert len(output.prefill_seq_groups) == 0
  569. assert output.decode_seq_groups[0].seq_group.request_id == "0"
  570. assert len(output.preempted) == 2
  571. # Verify budgets are updated.
  572. assert budget.num_batched_tokens == 1
  573. # NOTE: When enable_chunk is False, num_seqs budget is not updated.
  574. # assert budget.num_curr_seqs == 1
  575. # Both should be preempted, not swapped.
  576. assert output.blocks_to_swap_out == []
  577. # Nothing is copied.
  578. assert output.blocks_to_copy == []
  579. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  580. def test_decode_swap_beam_search(use_v2_block_manager: bool):
  581. """
  582. Test best_of > 1 swap out blocks
  583. """
  584. block_size = 4
  585. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  586. block_size=block_size,
  587. num_gpu_blocks=64,
  588. num_cpu_blocks=64)
  589. curr_loras = None
  590. budget = create_token_budget()
  591. for i in range(3):
  592. _, seq_group = create_dummy_prompt(str(i),
  593. prompt_length=60,
  594. best_of=2,
  595. block_size=block_size)
  596. scheduler._allocate_and_set_running(seq_group)
  597. scheduler._add_seq_group_to_running(seq_group)
  598. append_new_token_seq_group(60, seq_group, 1)
  599. budget.add_num_seqs(seq_group.request_id,
  600. seq_group.get_max_num_running_seqs())
  601. budget.add_num_batched_tokens(
  602. seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
  603. # The last request should be swapped out.
  604. scheduler.block_manager.can_append_slots = MagicMock()
  605. def cannot_append_second_group(seq_group, num_lookahead_slots):
  606. return seq_group.request_id != "2"
  607. scheduler.block_manager.can_append_slots.side_effect = (
  608. cannot_append_second_group)
  609. scheduler.block_manager.swap_out = MagicMock()
  610. expected_swap_mapping = [("5", "7")]
  611. scheduler.block_manager.swap_out.return_value = expected_swap_mapping
  612. output = scheduler._schedule_running(budget, curr_loras)
  613. remainig_running = scheduler.running
  614. assert len(remainig_running) == 0
  615. assert len(output.decode_seq_groups) == 2
  616. assert len(output.prefill_seq_groups) == 0
  617. assert output.decode_seq_groups[0].seq_group.request_id == "0"
  618. assert output.decode_seq_groups[1].seq_group.request_id == "1"
  619. assert len(output.preempted) == 0
  620. assert len(output.swapped_out) == 1
  621. # Budget should refledct preempted requests.
  622. assert budget.num_batched_tokens == 2
  623. # since there are 2 sequences, 2 should be subtracted.
  624. assert budget.num_curr_seqs == 4
  625. # Both should be preempted, not swapped.
  626. assert output.blocks_to_swap_out == expected_swap_mapping
  627. # Nothing is copied.
  628. assert output.blocks_to_copy == []
  629. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  630. def test_schedule_decode_blocks_to_copy_update(use_v2_block_manager: bool):
  631. """
  632. Verify blocks_to_copy is updated.
  633. """
  634. block_size = 4
  635. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  636. block_size=4,
  637. num_cpu_blocks=16,
  638. num_gpu_blocks=16)
  639. _, seq_group = create_dummy_prompt("1",
  640. prompt_length=60,
  641. best_of=2,
  642. block_size=block_size)
  643. curr_loras = None
  644. scheduler._allocate_and_set_running(seq_group)
  645. append_new_token_seq_group(60, seq_group, 1)
  646. scheduler._add_seq_group_to_running(seq_group)
  647. # The last request should be swapped out.
  648. scheduler.block_manager.append_slots = MagicMock()
  649. scheduler.block_manager.append_slots.return_value = [(2, 3)]
  650. budget = create_token_budget()
  651. output = scheduler._schedule_running(budget, curr_loras)
  652. remaining_running = scheduler.running
  653. assert len(remaining_running) == 0
  654. assert len(output.decode_seq_groups) == 1
  655. assert len(output.prefill_seq_groups) == 0
  656. assert len(output.preempted) == 0
  657. assert len(output.swapped_out) == 0
  658. # Nothing is preempted.
  659. assert output.blocks_to_swap_out == []
  660. # Since append_slot returns the source -> dist mapping, it should
  661. # applied.
  662. assert output.blocks_to_copy == [(2, 3)]
  663. def test_schedule_swapped_simple():
  664. scheduler = initialize_scheduler()
  665. curr_loras = None
  666. blocks_to_swap_out: List[Tuple[int, int]] = []
  667. _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
  668. scheduler._allocate_and_set_running(seq_group)
  669. append_new_token_seq_group(60, seq_group, 1)
  670. scheduler._swap_out(seq_group, blocks_to_swap_out)
  671. scheduler._add_seq_group_to_swapped(seq_group)
  672. budget = create_token_budget()
  673. output = scheduler._schedule_swapped(budget, curr_loras)
  674. remaining_swapped = scheduler.swapped
  675. assert len(remaining_swapped) == 0
  676. assert budget.num_batched_tokens == 1
  677. assert budget.num_curr_seqs == 2
  678. assert len(output.decode_seq_groups) == 1
  679. assert len(output.prefill_seq_groups) == 0
  680. # swap in is the reverse of swap out
  681. blocks_to_swap_in_reverse = []
  682. for swapin, swapout in output.blocks_to_swap_in:
  683. blocks_to_swap_in_reverse.append((swapout, swapin))
  684. assert blocks_to_swap_out == blocks_to_swap_in_reverse
  685. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  686. def test_schedule_swapped_max_token_budget(use_v2_block_manager: bool):
  687. block_size = 4
  688. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  689. block_size=block_size,
  690. num_cpu_blocks=32,
  691. num_gpu_blocks=32)
  692. curr_loras = None
  693. blocks_to_swap_out: List[Tuple[int, int]] = []
  694. for i in range(2):
  695. _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
  696. scheduler._allocate_and_set_running(seq_group)
  697. append_new_token_seq_group(60, seq_group, 1)
  698. scheduler._swap_out(seq_group, blocks_to_swap_out)
  699. scheduler._add_seq_group_to_swapped(seq_group)
  700. budget = create_token_budget(token_budget=1)
  701. output = scheduler._schedule_swapped(budget, curr_loras)
  702. remaining_swapped = scheduler.swapped
  703. assert len(remaining_swapped) == 1
  704. assert budget.num_batched_tokens == 1
  705. assert budget.num_curr_seqs == 2
  706. assert len(output.decode_seq_groups) == 1
  707. assert len(output.prefill_seq_groups) == 0
  708. # Verify num_batched_tokens are respected.
  709. budget = create_token_budget(token_budget=1)
  710. add_token_budget(budget, 1, 0)
  711. output = scheduler._schedule_swapped(budget, curr_loras)
  712. remaining_swapped = scheduler.swapped
  713. assert len(remaining_swapped) == 1
  714. assert budget.num_batched_tokens == 1
  715. assert budget.num_curr_seqs == 0
  716. assert len(output.decode_seq_groups) == 0
  717. assert len(output.prefill_seq_groups) == 0
  718. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  719. def test_schedule_swapped_max_seqs(use_v2_block_manager: bool):
  720. block_size = 4
  721. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  722. block_size=block_size,
  723. num_cpu_blocks=64,
  724. num_gpu_blocks=64)
  725. curr_loras = None
  726. blocks_to_swap_out: List[Tuple[int, int]] = []
  727. for i in range(4):
  728. _, seq_group = create_dummy_prompt(str(i),
  729. prompt_length=60,
  730. block_size=4)
  731. scheduler._allocate_and_set_running(seq_group)
  732. append_new_token_seq_group(60, seq_group, 1)
  733. scheduler._swap_out(seq_group, blocks_to_swap_out)
  734. scheduler._add_seq_group_to_swapped(seq_group)
  735. budget = create_token_budget(max_num_seqs=2)
  736. output = scheduler._schedule_swapped(budget, curr_loras)
  737. remaining_swapped = scheduler.swapped
  738. assert len(remaining_swapped) == 2
  739. assert budget.num_batched_tokens == 2
  740. assert budget.num_curr_seqs == 2
  741. assert len(output.decode_seq_groups) == 2
  742. assert len(output.prefill_seq_groups) == 0
  743. # Verify num_curr_seqs are respected.
  744. output = scheduler._schedule_swapped(budget, curr_loras)
  745. remaining_swapped = scheduler.swapped
  746. assert len(remaining_swapped) == 2
  747. assert budget.num_batched_tokens == 2
  748. assert budget.num_curr_seqs == 2
  749. assert len(output.decode_seq_groups) == 0
  750. assert len(output.prefill_seq_groups) == 0
  751. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  752. def test_schedule_swapped_max_loras(use_v2_block_manager: bool):
  753. block_size = 4
  754. lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
  755. scheduler = initialize_scheduler(lora_config=lora_config,
  756. use_v2_block_manager=use_v2_block_manager,
  757. block_size=block_size,
  758. num_cpu_blocks=32,
  759. num_gpu_blocks=32)
  760. curr_loras: Set[int] = set()
  761. blocks_to_swap_out: List[Tuple[int, int]] = []
  762. for i in range(2):
  763. _, seq_group = create_dummy_prompt(str(i),
  764. prompt_length=60,
  765. block_size=block_size,
  766. lora_request=LoRARequest(
  767. lora_name=str(i),
  768. lora_int_id=i + 1,
  769. lora_path="abc"))
  770. scheduler._allocate_and_set_running(seq_group)
  771. append_new_token_seq_group(60, seq_group, 1)
  772. scheduler._swap_out(seq_group, blocks_to_swap_out)
  773. scheduler._add_seq_group_to_swapped(seq_group)
  774. budget = create_token_budget()
  775. output = scheduler._schedule_swapped(budget, curr_loras)
  776. remaining_swapped = scheduler.swapped
  777. assert len(remaining_swapped) == 1
  778. assert budget.num_batched_tokens == 1
  779. assert budget.num_curr_seqs == 1
  780. assert len(output.decode_seq_groups) == 1
  781. assert len(output.prefill_seq_groups) == 0
  782. assert len(curr_loras) == 1
  783. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  784. def test_schedule_swapped_cannot_swap_in(use_v2_block_manager: bool):
  785. block_size = 4
  786. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  787. block_size=block_size,
  788. num_cpu_blocks=32,
  789. num_gpu_blocks=32)
  790. curr_loras = None
  791. blocks_to_swap_out: List[Tuple[int, int]] = []
  792. for i in range(2):
  793. _, seq_group = create_dummy_prompt(str(i),
  794. prompt_length=60,
  795. best_of=2,
  796. block_size=block_size)
  797. scheduler._allocate_and_set_running(seq_group)
  798. append_new_token_seq_group(60, seq_group, 1)
  799. scheduler._swap_out(seq_group, blocks_to_swap_out)
  800. scheduler._add_seq_group_to_swapped(seq_group)
  801. # The last request should be swapped out.
  802. scheduler.block_manager.can_swap_in = MagicMock()
  803. scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
  804. # Since we cannot swap in, none of the requests are swapped in.
  805. budget = create_token_budget()
  806. output = scheduler._schedule_swapped(budget, curr_loras)
  807. remaining_swapped = scheduler.swapped
  808. assert len(remaining_swapped) == 2
  809. assert budget.num_batched_tokens == 0
  810. assert budget.num_curr_seqs == 0
  811. assert len(output.decode_seq_groups) == 0
  812. assert len(output.prefill_seq_groups) == 0
  813. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  814. def test_infeasible_swap(use_v2_block_manager: bool):
  815. block_size = 4
  816. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  817. block_size=block_size,
  818. num_cpu_blocks=32,
  819. num_gpu_blocks=32)
  820. curr_loras = None
  821. blocks_to_swap_out: List[Tuple[int, int]] = []
  822. for i in range(2):
  823. _, seq_group = create_dummy_prompt(str(i),
  824. prompt_length=60,
  825. best_of=2,
  826. block_size=block_size)
  827. scheduler._allocate_and_set_running(seq_group)
  828. append_new_token_seq_group(60, seq_group, 1)
  829. scheduler._swap_out(seq_group, blocks_to_swap_out)
  830. scheduler._add_seq_group_to_swapped(seq_group)
  831. # The last request should be swapped out.
  832. scheduler.block_manager.can_swap_in = MagicMock()
  833. scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
  834. # Since we cannot swap in, none of the requests are swapped in.
  835. budget = create_token_budget()
  836. output = scheduler._schedule_swapped(budget, curr_loras)
  837. remaining_swapped = scheduler.swapped
  838. assert len(remaining_swapped) == 0
  839. assert len(output.infeasible_seq_groups) == 2
  840. assert budget.num_batched_tokens == 0
  841. assert budget.num_curr_seqs == 0
  842. assert len(output.decode_seq_groups) == 0
  843. assert len(output.prefill_seq_groups) == 0
  844. @pytest.mark.parametrize('use_v2_block_manager', [True, False])
  845. def test_schedule_swapped_blocks_to_copy(use_v2_block_manager: bool):
  846. block_size = 4
  847. scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
  848. block_size=block_size,
  849. num_cpu_blocks=32,
  850. num_gpu_blocks=32)
  851. curr_loras = None
  852. _, seq_group = create_dummy_prompt("1",
  853. prompt_length=60,
  854. best_of=2,
  855. block_size=block_size)
  856. scheduler._allocate_and_set_running(seq_group)
  857. append_new_token_seq_group(60, seq_group, 1)
  858. blocks_to_swap_out: List[Tuple[int, int]] = []
  859. scheduler._swap_out(seq_group, blocks_to_swap_out)
  860. scheduler._add_seq_group_to_swapped(seq_group)
  861. # The last request should be swapped out.
  862. scheduler.block_manager.append_slots = MagicMock()
  863. scheduler.block_manager.append_slots.return_value = [(2, 3)]
  864. budget = create_token_budget()
  865. output = scheduler._schedule_swapped(budget, curr_loras)
  866. remaining_swapped = scheduler.swapped
  867. assert len(remaining_swapped) == 0
  868. assert len(output.decode_seq_groups) == 1
  869. assert len(output.prefill_seq_groups) == 0
  870. assert output.blocks_to_copy == [(2, 3)]
  871. def test_scheduling_budget():
  872. TOKEN_BUDGET = 4
  873. MAX_SEQS = 4
  874. budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
  875. assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
  876. assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
  877. assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
  878. assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
  879. assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
  880. assert budget.remaining_token_budget() == TOKEN_BUDGET
  881. # Verify add/subtract num batched tokens.
  882. _, seq_group = create_dummy_prompt("1", 3)
  883. budget.add_num_batched_tokens(seq_group.request_id, 2)
  884. assert budget.remaining_token_budget() == 2
  885. assert budget.num_batched_tokens == 2
  886. assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
  887. assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
  888. # Verify adding another seq group is no-op.
  889. budget.add_num_batched_tokens(seq_group.request_id, 2)
  890. assert budget.remaining_token_budget() == 2
  891. assert budget.num_batched_tokens == 2
  892. budget.subtract_num_batched_tokens(seq_group.request_id, 2)
  893. assert budget.remaining_token_budget() == 4
  894. assert budget.num_batched_tokens == 0
  895. budget.subtract_num_batched_tokens(seq_group.request_id, 2)
  896. assert budget.remaining_token_budget() == 4
  897. assert budget.num_batched_tokens == 0
  898. # Verify add/subtract max seqs.
  899. _, seq_group = create_dummy_prompt("1", 3)
  900. budget.add_num_seqs(seq_group.request_id, 2)
  901. assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
  902. assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
  903. assert budget.num_curr_seqs == 2
  904. # Verify adding another seq group is no-op.
  905. budget.add_num_seqs(seq_group.request_id, 2)
  906. assert budget.num_curr_seqs == 2
  907. budget.subtract_num_seqs(seq_group.request_id, 2)
  908. assert budget.num_curr_seqs == 0
  909. budget.subtract_num_seqs(seq_group.request_id, 2)
  910. assert budget.num_curr_seqs == 0