test_block_manager_v2.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. import pytest
  2. from aphrodite.common.sequence import Logprob, SequenceStatus
  3. from aphrodite.common.utils import chunk_list
  4. from aphrodite.processing.block.utils import (
  5. STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE, STR_NOT_IMPL_ENC_DEC_SWA)
  6. from aphrodite.processing.block_manager_v2 import BlockSpaceManagerV2
  7. from aphrodite.processing.interfaces import AllocStatus
  8. from ..utils import (create_dummy_prompt, create_seq_group,
  9. create_seq_group_encoder_decoder)
  10. @pytest.mark.parametrize("block_size", [16])
  11. @pytest.mark.parametrize("num_gpu_blocks", [8, 40, 80])
  12. @pytest.mark.parametrize("num_seqs_per_group", [1, 4])
  13. @pytest.mark.parametrize("watermark", [0.0, 0.5])
  14. def test_can_allocate_seq_group(block_size: int, num_seqs_per_group: int,
  15. num_gpu_blocks: int, watermark: float):
  16. block_manager = BlockSpaceManagerV2(
  17. block_size=block_size,
  18. num_gpu_blocks=num_gpu_blocks,
  19. num_cpu_blocks=1024,
  20. watermark=watermark,
  21. )
  22. num_watermark_blocks = int(watermark * num_gpu_blocks)
  23. num_output_blocks_per_seq = 1
  24. # NOTE: This should be num_output_blocks_per_seq * num_seqs_per_group, but
  25. # the current implementation assumes all seqs are new prompts / don't have
  26. # different output lens.
  27. num_output_blocks = num_output_blocks_per_seq
  28. for num_prompt_blocks in range(1, num_gpu_blocks - num_output_blocks):
  29. seq_group = create_seq_group(
  30. seq_prompt_len=block_size * num_prompt_blocks,
  31. seq_output_lens=[
  32. block_size * num_output_blocks_per_seq
  33. for _ in range(num_seqs_per_group)
  34. ],
  35. )
  36. assert num_prompt_blocks + num_output_blocks <= num_gpu_blocks
  37. can_allocate_result = block_manager.can_allocate(seq_group)
  38. num_required_blocks = num_prompt_blocks + num_output_blocks
  39. if num_gpu_blocks - num_required_blocks < num_watermark_blocks:
  40. assert can_allocate_result == AllocStatus.NEVER
  41. elif num_gpu_blocks >= num_required_blocks:
  42. assert can_allocate_result == AllocStatus.OK
  43. else:
  44. assert can_allocate_result == AllocStatus.LATER
  45. @pytest.mark.parametrize("block_size", [16])
  46. @pytest.mark.parametrize("num_gpu_blocks", [16, 80, 160])
  47. @pytest.mark.parametrize("num_seqs_per_group", [1, 4])
  48. @pytest.mark.parametrize("watermark", [0.0, 0.5])
  49. def test_can_allocate_seq_group_encoder_decoder(block_size: int,
  50. num_seqs_per_group: int,
  51. num_gpu_blocks: int,
  52. watermark: float):
  53. block_manager = BlockSpaceManagerV2(
  54. block_size=block_size,
  55. num_gpu_blocks=num_gpu_blocks,
  56. num_cpu_blocks=1024,
  57. watermark=watermark,
  58. )
  59. num_watermark_blocks = int(watermark * num_gpu_blocks)
  60. num_output_blocks_per_seq = 1
  61. # NOTE: This should be num_output_blocks_per_seq * num_seqs_per_group, but
  62. # the current implementation assumes all seqs are new prompts / don't have
  63. # different output lens.
  64. num_output_blocks = num_output_blocks_per_seq
  65. for bdx, num_prompt_blocks in enumerate(
  66. range(1, num_gpu_blocks - num_output_blocks)):
  67. num_cross_blocks_per_seq = num_prompt_blocks
  68. seq_group = create_seq_group_encoder_decoder(
  69. seq_prompt_len=block_size * num_prompt_blocks,
  70. seq_output_lens=[
  71. block_size * num_output_blocks_per_seq
  72. for _ in range(num_seqs_per_group)
  73. ],
  74. request_id=str(bdx))
  75. assert num_prompt_blocks + num_output_blocks <= num_gpu_blocks
  76. can_allocate_result = block_manager.can_allocate(seq_group)
  77. num_required_blocks = num_prompt_blocks + \
  78. num_output_blocks + \
  79. num_cross_blocks_per_seq
  80. if num_gpu_blocks - num_required_blocks < num_watermark_blocks:
  81. assert can_allocate_result == AllocStatus.NEVER
  82. elif num_gpu_blocks >= num_required_blocks:
  83. assert can_allocate_result == AllocStatus.OK
  84. else:
  85. assert can_allocate_result == AllocStatus.LATER
  86. @pytest.mark.parametrize("block_size", [16])
  87. @pytest.mark.parametrize("num_gpu_blocks", [16])
  88. @pytest.mark.parametrize("num_seqs_per_group", [1])
  89. @pytest.mark.parametrize("watermark", [0.0, 0.5])
  90. def test_can_allocate_encoder_decoder_fails_with_swa(block_size: int,
  91. num_seqs_per_group: int,
  92. num_gpu_blocks: int,
  93. watermark: float):
  94. '''
  95. SWA short for Sliding Window Attention.
  96. At time of writing block manager v2 does not support SWA.
  97. However even when SWA is implemented for block manager v2,
  98. there will still most likely be a separate workstream required
  99. to enable SWA for encoder/decoder models.
  100. Therefore this test enforces that one of the following cases
  101. hold true:
  102. 1. Block manager v2 does not support SWA at all (true at time of writing)
  103. 2. Block manager v2 fails with NotImplementError when SWA is enabled
  104. AND a SequenceGroup with an encoder sequence (i.e. in support of an
  105. encoder/decoder model) is passed into can_allocate() as an argument
  106. The setup for this test is stripped down version of
  107. test_can_allocate_seq_group_encoder_decoder()
  108. '''
  109. with pytest.raises((NotImplementedError, AssertionError)) as exc_info:
  110. block_manager = BlockSpaceManagerV2(
  111. block_size=block_size,
  112. num_gpu_blocks=num_gpu_blocks,
  113. num_cpu_blocks=1024,
  114. watermark=watermark,
  115. sliding_window=5 # SWA
  116. )
  117. num_output_blocks_per_seq = 1
  118. num_prompt_blocks = 1
  119. num_output_blocks = num_output_blocks_per_seq
  120. seq_group = create_seq_group_encoder_decoder(
  121. seq_prompt_len=block_size * num_prompt_blocks,
  122. seq_output_lens=[
  123. block_size * num_output_blocks_per_seq
  124. for _ in range(num_seqs_per_group)
  125. ],
  126. request_id="0")
  127. assert num_prompt_blocks + num_output_blocks <= num_gpu_blocks
  128. block_manager.can_allocate(seq_group)
  129. # Assert that either
  130. # 1. Block manager v2 constructor fails with assertion that sliding window
  131. # is not yet supported (most likely near-term outcome at time of
  132. # writing), or
  133. # 2. can_allocate() fails with NotImplementedError due to combination of
  134. # encoder/decoder and sliding window attention
  135. if isinstance(exc_info.value, NotImplementedError):
  136. assert str(exc_info.value) == STR_NOT_IMPL_ENC_DEC_SWA
  137. elif isinstance(exc_info.value, AssertionError):
  138. assert str(exc_info.value) == "Sliding window not yet supported"
  139. @pytest.mark.parametrize("block_size", [16])
  140. @pytest.mark.parametrize("num_gpu_blocks", [16])
  141. @pytest.mark.parametrize("num_seqs_per_group", [1])
  142. @pytest.mark.parametrize("watermark", [0.0, 0.5])
  143. def test_can_allocate_encoder_decoder_fails_with_prefix_cache(
  144. block_size: int, num_seqs_per_group: int, num_gpu_blocks: int,
  145. watermark: float):
  146. block_manager = BlockSpaceManagerV2(
  147. block_size=block_size,
  148. num_gpu_blocks=num_gpu_blocks,
  149. num_cpu_blocks=1024,
  150. watermark=watermark,
  151. enable_caching=True # Prefix cache
  152. )
  153. num_output_blocks_per_seq = 1
  154. num_prompt_blocks = 1
  155. num_output_blocks = num_output_blocks_per_seq
  156. seq_group = create_seq_group_encoder_decoder(
  157. seq_prompt_len=block_size * num_prompt_blocks,
  158. seq_output_lens=[
  159. block_size * num_output_blocks_per_seq
  160. for _ in range(num_seqs_per_group)
  161. ],
  162. request_id="0")
  163. assert num_prompt_blocks + num_output_blocks <= num_gpu_blocks
  164. # Assert that either can_allocate() fails with NotImplementedError
  165. # due to combination of encoder/decoder and prefix cache
  166. with pytest.raises(NotImplementedError) as exc_info:
  167. block_manager.can_allocate(seq_group)
  168. assert str(exc_info.value) == STR_NOT_IMPL_ENC_DEC_PREFIX_CACHE
  169. @pytest.mark.parametrize("block_size", [1, 8])
  170. @pytest.mark.parametrize("prompt_len", [1, 7, 8])
  171. @pytest.mark.parametrize("num_slots_to_append", [1, 8, 129])
  172. @pytest.mark.parametrize("num_lookahead_slots", [0, 10])
  173. def test_append_slots(block_size, prompt_len, num_slots_to_append,
  174. num_lookahead_slots):
  175. """Verify append_slots consumes the correct number of blocks from the block
  176. table.
  177. """
  178. num_gpu_blocks = 1024
  179. watermark = 0.1
  180. block_manager = BlockSpaceManagerV2(
  181. block_size=block_size,
  182. num_gpu_blocks=num_gpu_blocks,
  183. num_cpu_blocks=0,
  184. watermark=watermark,
  185. )
  186. seq_group = create_seq_group(
  187. seq_prompt_len=prompt_len,
  188. seq_output_lens=[0],
  189. )
  190. # Allocate seq
  191. assert block_manager.can_allocate(seq_group)
  192. block_manager.allocate(seq_group)
  193. # Seq seq to RUNNING
  194. seq = seq_group.get_seqs()[0]
  195. seq.status = SequenceStatus.RUNNING
  196. # Append tokens to the sequeqnce
  197. for token_id in range(num_slots_to_append):
  198. seq.append_token_id(token_id, {token_id: Logprob(0.0)})
  199. # Append slots for new tokens and lookahead slots.
  200. free_blocks_before_append = block_manager.get_num_free_gpu_blocks()
  201. block_manager.append_slots(seq, num_lookahead_slots)
  202. num_consumed_blocks = (free_blocks_before_append -
  203. block_manager.get_num_free_gpu_blocks())
  204. # Expect consumed blocks to be new blocks required to support the new slots.
  205. expected_consumed_blocks = len(
  206. list(
  207. chunk_list(
  208. list(
  209. range(prompt_len + num_slots_to_append +
  210. num_lookahead_slots)),
  211. block_size))) - len(
  212. list(chunk_list(list(range(prompt_len)), block_size)))
  213. assert num_consumed_blocks == expected_consumed_blocks
  214. @pytest.mark.parametrize("block_size", [8])
  215. @pytest.mark.parametrize("num_cpu_blocks", [4])
  216. @pytest.mark.parametrize("num_gpu_blocks", [4])
  217. @pytest.mark.parametrize("num_lookahead_slots", [0, 2, 10])
  218. @pytest.mark.parametrize("enable_caching", [False, True])
  219. def test_swap(block_size, num_cpu_blocks, num_gpu_blocks, num_lookahead_slots,
  220. enable_caching):
  221. """Verify blocks number on src/desc device is correct after swapping in/out
  222. sequence group (not missing or extra blocks).
  223. """
  224. block_manager = BlockSpaceManagerV2(block_size,
  225. num_cpu_blocks,
  226. num_gpu_blocks,
  227. watermark=0,
  228. enable_caching=enable_caching)
  229. prompt, seq_group = create_dummy_prompt("1", prompt_length=block_size - 1)
  230. prompt.status = SequenceStatus.WAITING
  231. block_manager.allocate(seq_group)
  232. # Emulate a forward pass by appending a single token.
  233. # The block manager then knows how many unprocessed
  234. # tokens will be written in the next forward pass.
  235. token_id = 0
  236. prompt.status = SequenceStatus.RUNNING
  237. prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
  238. # Swap seq group from GPU -> CPU.
  239. gpu_blocks = block_manager.get_block_table(prompt)
  240. assert block_manager.can_swap_out(seq_group)
  241. before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
  242. before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
  243. mapping = block_manager.swap_out(seq_group)
  244. mapping_keys = [key for key, _ in mapping]
  245. assert mapping_keys == gpu_blocks
  246. after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
  247. after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
  248. assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
  249. assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
  250. prompt.status = SequenceStatus.SWAPPED
  251. # Swap seq group from CPU -> GPU.
  252. assert block_manager.can_swap_in(seq_group, num_lookahead_slots)
  253. before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
  254. before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
  255. mapping = block_manager.swap_in(seq_group)
  256. cpu_blocks = block_manager.get_block_table(prompt)
  257. mapping_keys = [key for key, _ in mapping]
  258. assert mapping_keys == [cpu_blocks[0]]
  259. after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
  260. after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
  261. assert before_gpu_blocks == after_gpu_blocks + len(cpu_blocks)
  262. @pytest.mark.parametrize("block_size", [8])
  263. @pytest.mark.parametrize("num_gpu_blocks", [4])
  264. @pytest.mark.parametrize("num_lookahead_slots", [3, 8, 10])
  265. @pytest.mark.parametrize("enable_caching", [True, False])
  266. def test_can_swap(block_size, num_gpu_blocks, num_lookahead_slots,
  267. enable_caching):
  268. """ Verify the block manager can correctly determine if a sequence group
  269. can be swapped in/out.
  270. """
  271. num_cpu_blocks = num_gpu_blocks
  272. block_manager = BlockSpaceManagerV2(block_size,
  273. num_cpu_blocks,
  274. num_gpu_blocks,
  275. watermark=0,
  276. enable_caching=enable_caching)
  277. prompt, seq_group = create_dummy_prompt(
  278. "1", prompt_length=(num_gpu_blocks - 1) * block_size - 1)
  279. prompt.status = SequenceStatus.WAITING
  280. block_manager.allocate(seq_group)
  281. prompt.status = SequenceStatus.RUNNING
  282. # Swap seq group from GPU -> CPU.
  283. gpu_blocks = block_manager.get_block_table(prompt)
  284. assert block_manager.can_swap_out(seq_group)
  285. before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
  286. before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
  287. mapping = block_manager.swap_out(seq_group)
  288. mapping_keys = [key for key, _ in mapping]
  289. assert mapping_keys == gpu_blocks
  290. after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
  291. after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
  292. assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
  293. assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
  294. prompt.status = SequenceStatus.SWAPPED
  295. # At this moment, we still have enough free blocks to swap in the seq group.
  296. if num_lookahead_slots <= block_size:
  297. assert block_manager.can_swap_in(seq_group,
  298. num_lookahead_slots) == AllocStatus.OK
  299. else:
  300. assert block_manager.can_swap_in(
  301. seq_group, num_lookahead_slots) == AllocStatus.NEVER
  302. # During Swapped out, 2 cached blocks were evicted from the GPU,
  303. # so the prompt1 can't be swapped in
  304. prompt2_len = 2 * block_size - 1
  305. prompt2, seq_group2 = create_dummy_prompt(
  306. "2",
  307. prompt_length=prompt2_len,
  308. prompt_tokens=[10000 + i for i in range(prompt2_len)])
  309. prompt2.status = SequenceStatus.WAITING
  310. block_manager.allocate(seq_group2)
  311. # Swap seq group from CPU -> GPU.
  312. if num_lookahead_slots <= block_size:
  313. assert block_manager.can_swap_in(
  314. seq_group, num_lookahead_slots) == AllocStatus.LATER
  315. else:
  316. assert block_manager.can_swap_in(
  317. seq_group, num_lookahead_slots) == AllocStatus.NEVER
  318. # TODO: add comprehensive tests for swapping at allocator level.
  319. @pytest.mark.parametrize("block_size", [8, 16])
  320. @pytest.mark.parametrize("prompt_len", [10, 300, 1000])
  321. @pytest.mark.parametrize("num_slots_to_append", [50])
  322. @pytest.mark.parametrize("sliding_window", [20, 32, 200, 512])
  323. def test_sliding_window(block_size, prompt_len, num_slots_to_append,
  324. sliding_window):
  325. """Verify append_slots consumes the correct number of blocks from the block
  326. table.
  327. """
  328. num_gpu_blocks = 1024
  329. watermark = 0.1
  330. block_manager = BlockSpaceManagerV2(
  331. block_size=block_size,
  332. num_gpu_blocks=num_gpu_blocks,
  333. num_cpu_blocks=0,
  334. watermark=watermark,
  335. sliding_window=sliding_window,
  336. )
  337. def check_used(min_n, max_n=None):
  338. if max_n is None:
  339. max_n = min_n
  340. used = num_gpu_blocks - block_manager.get_num_free_gpu_blocks()
  341. #print("check", min_n, used, max_n)
  342. assert min_n <= used
  343. assert used <= max_n
  344. def num_blocks(num_tokens):
  345. return (num_tokens + block_size - 1) // block_size
  346. check_used(0)
  347. seq_group = create_seq_group(
  348. seq_prompt_len=prompt_len,
  349. seq_output_lens=[0],
  350. )
  351. check_used(0)
  352. # Allocate seq
  353. assert block_manager.can_allocate(seq_group)
  354. block_manager.allocate(seq_group)
  355. check_used(num_blocks(prompt_len))
  356. # Seq seq to RUNNING
  357. seq = seq_group.get_seqs()[0]
  358. seq.status = SequenceStatus.RUNNING
  359. seq.data.update_num_computed_tokens(prompt_len)
  360. check_used(num_blocks(prompt_len))
  361. # this is how we compute it in BlockSpaceManagerV2.__init__
  362. sliding_blocks = (sliding_window // block_size) + 2
  363. # plus one block for null block
  364. sliding_blocks += 1
  365. # Append tokens to the sequeqnce
  366. for token_id in range(num_slots_to_append):
  367. seq.append_token_id(token_id, {token_id: Logprob(0.0)})
  368. seq.data.update_num_computed_tokens(1)
  369. block_manager.append_slots(seq, num_lookahead_slots=0)
  370. if prompt_len < sliding_window + 10:
  371. check_used(0, sliding_blocks + 1)
  372. else:
  373. check_used(sliding_blocks, sliding_blocks + 1)