1
0

test_model_runner.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. from typing import List
  2. import pytest
  3. import torch
  4. from aphrodite.common.sequence import (SamplingParams, SequenceData,
  5. SequenceGroupMetadata)
  6. from aphrodite.common.utils import get_open_port
  7. from aphrodite.distributed.parallel_state import (
  8. ensure_model_parallel_initialized, init_distributed_environment)
  9. from aphrodite.engine.args_tools import EngineArgs
  10. from aphrodite.modeling.sampling_metadata import SamplingMetadata
  11. from aphrodite.worker.model_runner import ModelRunner, _get_graph_batch_size
  12. def _create_model_runner(model: str, *args, **kwargs) -> ModelRunner:
  13. engine_args = EngineArgs(model, *args, **kwargs)
  14. engine_config = engine_args.create_engine_config()
  15. model_runner = ModelRunner(
  16. model_config=engine_config.model_config,
  17. parallel_config=engine_config.parallel_config,
  18. scheduler_config=engine_config.scheduler_config,
  19. device_config=engine_config.device_config,
  20. cache_config=engine_config.cache_config,
  21. load_config=engine_config.load_config,
  22. lora_config=engine_config.lora_config,
  23. prompt_adapter_config=engine_config.prompt_adapter_config,
  24. is_driver_worker=True,
  25. )
  26. return model_runner
  27. @pytest.mark.parametrize("batch_size", list(range(1, 257)))
  28. def test_prepare_prompt(batch_size):
  29. model_runner = _create_model_runner(
  30. "facebook/opt-125m",
  31. max_num_batched_tokens=100000,
  32. max_num_seqs=100000,
  33. enable_chunked_prefill=False,
  34. )
  35. seq_lens: List[int] = []
  36. seq_group_metadata_list: List[SequenceGroupMetadata] = []
  37. block_tables = {0: [1]}
  38. for i in range(batch_size):
  39. # make sure all tokens fit into one block
  40. seq_len = i % (model_runner.block_size - 1) + 1
  41. seq_lens.append(seq_len)
  42. seq_data = SequenceData.from_seqs(range(seq_len))
  43. seq_group_metadata = SequenceGroupMetadata(
  44. request_id=f"test_{i}",
  45. is_prompt=True,
  46. seq_data={0: seq_data},
  47. sampling_params=SamplingParams(temperature=0),
  48. block_tables=block_tables,
  49. )
  50. assert seq_group_metadata.token_chunk_size == seq_data.get_len()
  51. seq_group_metadata_list.append(seq_group_metadata)
  52. expected_selected_token_indices = []
  53. selected_token_start_idx = 0
  54. for seq_len in seq_lens:
  55. expected_selected_token_indices.append(selected_token_start_idx +
  56. seq_len - 1)
  57. selected_token_start_idx += seq_len
  58. model_input = model_runner._prepare_model_input_tensors(
  59. seq_group_metadata_list)
  60. input_tokens = model_input.input_tokens
  61. input_positions = model_input.input_positions
  62. attn_metadata = model_input.attn_metadata
  63. return_seq_lens = model_input.seq_lens
  64. slot_mapping = attn_metadata.slot_mapping
  65. assert return_seq_lens == seq_lens
  66. assert len(slot_mapping) == len(input_tokens)
  67. # Verify input metadata is correct for prompts.
  68. device = model_runner.device
  69. assert attn_metadata.num_prefills > 0
  70. assert attn_metadata.num_decode_tokens == 0
  71. torch.testing.assert_close(
  72. attn_metadata.seq_lens_tensor,
  73. torch.tensor(seq_lens, device=device, dtype=torch.int))
  74. assert attn_metadata.seq_lens == seq_lens
  75. assert attn_metadata.max_prefill_seq_len == max(seq_lens)
  76. assert attn_metadata.max_decode_seq_len == 0
  77. # Test subquery start locs.
  78. start_idx = 0
  79. start_loc = [start_idx]
  80. for seq_len in seq_lens:
  81. start_idx += seq_len
  82. start_loc.append(start_idx)
  83. torch.testing.assert_close(
  84. attn_metadata.query_start_loc,
  85. torch.tensor(start_loc, dtype=torch.int32, device=device))
  86. # Test seq start locs. Note that for normal prefill it is
  87. # equivalent to query_start_loc.
  88. start_idx = 0
  89. seq_start_loc = [start_idx]
  90. for seq_len in seq_lens:
  91. start_idx += seq_len
  92. seq_start_loc.append(start_idx)
  93. torch.testing.assert_close(
  94. attn_metadata.seq_start_loc,
  95. torch.tensor(start_loc, dtype=torch.int32, device=device))
  96. torch.testing.assert_close(
  97. attn_metadata.context_lens_tensor,
  98. torch.zeros(attn_metadata.context_lens_tensor.shape[0],
  99. dtype=torch.int,
  100. device=device))
  101. expected = torch.tensor([[] for _ in range(len(seq_group_metadata_list))],
  102. dtype=torch.int32,
  103. device=model_runner.device)
  104. torch.testing.assert_close(attn_metadata.block_tables, expected)
  105. # Cuda graph should not be used for prerill.
  106. assert attn_metadata.use_cuda_graph is False
  107. assert len(input_tokens) == sum(seq_lens)
  108. assert len(input_positions) == sum(seq_lens)
  109. torch.testing.assert_close(input_tokens, input_positions)
  110. sampling_metadata = SamplingMetadata.prepare(
  111. seq_group_metadata_list,
  112. seq_lens,
  113. query_lens=seq_lens,
  114. device=model_runner.device,
  115. pin_memory=model_runner.pin_memory)
  116. assert len(input_tokens) == sum(seq_lens)
  117. assert len(input_positions) == sum(seq_lens)
  118. actual = sampling_metadata.selected_token_indices
  119. expected = torch.tensor(expected_selected_token_indices,
  120. device=actual.device,
  121. dtype=actual.dtype)
  122. torch.testing.assert_close(actual, expected)
  123. torch.allclose(input_tokens, input_positions)
  124. actual = sampling_metadata.selected_token_indices
  125. expected = torch.tensor(expected_selected_token_indices,
  126. device=actual.device,
  127. dtype=actual.dtype)
  128. torch.testing.assert_close(actual, expected)
  129. @pytest.mark.parametrize("batch_size", list(range(1, 257)))
  130. def test_prepare_decode_cuda_graph(batch_size):
  131. model_runner = _create_model_runner(
  132. "facebook/opt-125m",
  133. seed=0,
  134. dtype="float16",
  135. enforce_eager=False,
  136. max_num_batched_tokens=100000,
  137. max_num_seqs=100000,
  138. enable_chunked_prefill=False,
  139. )
  140. context_lens: List[int] = []
  141. seq_group_metadata_list: List[SequenceGroupMetadata] = []
  142. # Assume each seq group finishes prefill.
  143. for i in range(batch_size):
  144. # make sure all tokens fit into one block
  145. context_len = i % (model_runner.block_size - 1) + 1
  146. context_lens.append(context_len)
  147. seq_data = SequenceData.from_seqs(range(context_len))
  148. seq_data.update_num_computed_tokens(context_len)
  149. # Append one token ID since prefill is finished.
  150. seq_data.append_token_id(1, 0)
  151. seq_group_metadata = SequenceGroupMetadata(
  152. request_id=f"test_{i}",
  153. is_prompt=False,
  154. seq_data={0: seq_data},
  155. sampling_params=SamplingParams(temperature=0),
  156. block_tables={0: [1]},
  157. )
  158. assert seq_group_metadata.token_chunk_size == 1
  159. seq_group_metadata_list.append(seq_group_metadata)
  160. model_input = model_runner._prepare_model_input_tensors(
  161. seq_group_metadata_list)
  162. input_tokens, input_positions, attn_metadata, slot_mapping = (
  163. model_input.input_tokens, model_input.input_positions,
  164. model_input.attn_metadata, model_input.attn_metadata.slot_mapping)
  165. assert len(slot_mapping) == len(input_tokens)
  166. expected_bs = _get_graph_batch_size(len(seq_group_metadata_list))
  167. # Verify input metadata is correct for prompts.
  168. device = model_runner.device
  169. assert attn_metadata.num_prefills == 0
  170. assert attn_metadata.num_prefill_tokens == 0
  171. seq_lens = [context_len + 1 for context_len in context_lens]
  172. # seq_lens are padded to expected_bs
  173. for _ in range(expected_bs - len(seq_lens)):
  174. seq_lens.append(1)
  175. assert attn_metadata.seq_lens == seq_lens
  176. assert attn_metadata.num_decode_tokens == len(seq_lens)
  177. start_idx = 0
  178. start_loc = [start_idx]
  179. for _ in context_lens:
  180. # decode has only 1 token for query.
  181. start_idx += 1
  182. start_loc.append(start_idx)
  183. torch.testing.assert_close(
  184. attn_metadata.query_start_loc,
  185. torch.tensor(start_loc, dtype=torch.int32, device=device))
  186. start_idx = 0
  187. seq_start_loc = [start_idx]
  188. for seq_len in seq_lens:
  189. start_idx += seq_len
  190. seq_start_loc.append(start_idx)
  191. torch.testing.assert_close(
  192. attn_metadata.seq_start_loc,
  193. torch.tensor(seq_start_loc, dtype=torch.int32, device=device))
  194. torch.testing.assert_close(
  195. attn_metadata.context_lens_tensor,
  196. torch.tensor(context_lens, dtype=torch.int, device=device))
  197. assert attn_metadata.max_decode_seq_len == max(seq_lens)
  198. torch.testing.assert_close(
  199. attn_metadata.seq_lens_tensor[:len(seq_lens)],
  200. torch.tensor(seq_lens, dtype=torch.int, device=device))
  201. # block table's first index corresponds to each batch, meaning in
  202. # decoding it is each token.
  203. assert attn_metadata.block_tables.shape[0] == len(input_tokens)
  204. # Block table's second dim correspondsd to each token's block number.
  205. # It is padded up to
  206. assert attn_metadata.block_tables.shape[1] == (
  207. model_runner.get_max_block_per_batch())
  208. assert attn_metadata.use_cuda_graph is True
  209. assert len(input_tokens) == expected_bs
  210. assert len(input_positions) == expected_bs
  211. torch.allclose(input_tokens, input_positions)
  212. # Verify Sampling
  213. expected_selected_token_indices = []
  214. selected_token_start_idx = 0
  215. for _ in context_lens:
  216. expected_selected_token_indices.append(selected_token_start_idx)
  217. selected_token_start_idx += 1
  218. sampling_metadata = SamplingMetadata.prepare(
  219. seq_group_metadata_list,
  220. seq_lens,
  221. # query lens is all 1 for decode.
  222. query_lens=[1 for _ in range(len(context_lens))],
  223. device=model_runner.device,
  224. pin_memory=model_runner.pin_memory)
  225. actual = sampling_metadata.selected_token_indices
  226. expected = torch.tensor(expected_selected_token_indices,
  227. device=actual.device,
  228. dtype=actual.dtype)
  229. torch.testing.assert_close(actual, expected)
  230. def test_empty_seq_group():
  231. """Verify prepare prompt and decode returns empty output."""
  232. model_runner = _create_model_runner(
  233. "facebook/opt-125m",
  234. seed=0,
  235. dtype="float16",
  236. enforce_eager=False,
  237. )
  238. seq_group_metadata_list: List[SequenceGroupMetadata] = []
  239. model_input = model_runner._prepare_model_input_tensors(
  240. seq_group_metadata_list)
  241. input_tokens, input_positions, attn_metadata = (
  242. model_input.input_tokens,
  243. model_input.input_positions,
  244. model_input.attn_metadata,
  245. )
  246. assert input_tokens is None
  247. assert input_positions is None
  248. assert attn_metadata is None
  249. model_input = model_runner._prepare_model_input_tensors(
  250. seq_group_metadata_list)
  251. (input_tokens, input_positions, attn_metadata, return_seq_lens) = (
  252. model_input.input_tokens,
  253. model_input.input_positions,
  254. model_input.attn_metadata,
  255. model_input.seq_lens,
  256. )
  257. assert input_tokens is None
  258. assert input_positions is None
  259. assert attn_metadata is None
  260. assert return_seq_lens is None
  261. @pytest.fixture
  262. def distributed_init():
  263. init_distributed_environment(
  264. world_size=1,
  265. rank=0,
  266. distributed_init_method=f"tcp://127.0.0.1:{get_open_port()}",
  267. local_rank=0)
  268. ensure_model_parallel_initialized(1, 1)
  269. @pytest.mark.parametrize("batch_size", list(range(2, 128)))
  270. @pytest.mark.parametrize("enforce_eager", [True, False])
  271. def test_hybrid_batches(batch_size, enforce_eager, distributed_init):
  272. model_runner = _create_model_runner(
  273. "facebook/opt-125m",
  274. seed=0,
  275. dtype="float16",
  276. enforce_eager=enforce_eager,
  277. max_num_batched_tokens=100000,
  278. max_num_seqs=100000,
  279. enable_chunked_prefill=True,
  280. )
  281. # Add prefill requests.
  282. seq_lens: List[int] = []
  283. seq_group_metadata_list: List[SequenceGroupMetadata] = []
  284. prefill_metadata_list: List[SequenceGroupMetadata] = []
  285. decode_metadata_list: List[SequenceGroupMetadata] = []
  286. block_tables = {0: [1]}
  287. prefill_batch_size = batch_size // 2
  288. decode_batch_size = batch_size - prefill_batch_size
  289. for i in range(prefill_batch_size):
  290. # make sure all tokens fit into one block
  291. seq_len = i % (model_runner.block_size - 1) + 1
  292. seq_lens.append(seq_len)
  293. seq_data = SequenceData.from_seqs(range(seq_len))
  294. seq_group_metadata = SequenceGroupMetadata(
  295. request_id=f"test_{i}",
  296. is_prompt=True,
  297. seq_data={0: seq_data},
  298. sampling_params=SamplingParams(temperature=0),
  299. block_tables=block_tables,
  300. )
  301. assert seq_group_metadata.token_chunk_size == seq_data.get_len()
  302. seq_group_metadata_list.append(seq_group_metadata)
  303. prefill_metadata_list.append(seq_group_metadata)
  304. # Add decode requests
  305. for i in range(prefill_batch_size, batch_size):
  306. # make sure all tokens fit into one block
  307. context_len = i % (model_runner.block_size - 1) + 1
  308. seq_data = SequenceData.from_seqs(range(context_len))
  309. seq_data.append_token_id(1, 0)
  310. seq_data.update_num_computed_tokens(context_len)
  311. seq_group_metadata = SequenceGroupMetadata(
  312. request_id=f"test_{i}",
  313. is_prompt=False,
  314. seq_data={0: seq_data},
  315. sampling_params=SamplingParams(temperature=0),
  316. block_tables={0: [1]},
  317. )
  318. assert seq_group_metadata.token_chunk_size == 1
  319. seq_group_metadata_list.append(seq_group_metadata)
  320. decode_metadata_list.append(seq_group_metadata)
  321. model_input = model_runner.prepare_model_input(seq_group_metadata_list)
  322. (input_tokens, input_positions, attn_metadata) = (
  323. model_input.input_tokens,
  324. model_input.input_positions,
  325. model_input.attn_metadata,
  326. )
  327. prefill_meta_actual = attn_metadata.prefill_metadata
  328. decode_meta_actual = attn_metadata.decode_metadata
  329. assert len(attn_metadata.slot_mapping) == len(input_tokens)
  330. assert len(input_positions) == len(input_tokens)
  331. assert attn_metadata.num_prefills == prefill_batch_size
  332. assert attn_metadata.num_decode_tokens == decode_batch_size
  333. assert attn_metadata.num_prefill_tokens == sum(seq_lens)
  334. # Verify attn metadata is consistent. We don't need to test individual
  335. # values here because they are tested above.
  336. attn_metadata = model_runner._prepare_model_input_tensors(
  337. seq_group_metadata_list).attn_metadata
  338. for attr_expected, attr_actual in zip(vars(attn_metadata.prefill_metadata),
  339. vars(prefill_meta_actual)):
  340. assert attr_expected[1] == attr_actual[1]
  341. for attr_expected, attr_actual in zip(vars(attn_metadata.decode_metadata),
  342. vars(decode_meta_actual)):
  343. assert attr_expected[1] == attr_actual[1]