cpu_model_runner.py 16 KB


  1. from typing import Dict, List, Optional, Tuple
  2. import torch
  3. from aphrodite.attention import AttentionMetadata, get_attn_backend
  4. from aphrodite.common.config import (
  5. DeviceConfig,
  6. LoRAConfig,
  7. ModelConfig,
  8. ParallelConfig,
  9. SchedulerConfig,
  10. )
  11. from aphrodite.common.sampling_params import SamplingParams, SamplingType
  12. from aphrodite.common.sequence import (
  13. SamplerOutput,
  14. SequenceData,
  15. SequenceGroupMetadata,
  16. )
  17. from aphrodite.common.utils import make_tensor_with_pad, maybe_expand_dim
  18. from aphrodite.distributed import broadcast_tensor_dict
  19. from aphrodite.modeling import SamplingMetadata
  20. from aphrodite.modeling.loader import get_model
  21. _PAD_SLOT_ID = -1
  22. class CPUModelRunner:
  23. def __init__(
  24. self,
  25. model_config: ModelConfig,
  26. parallel_config: ParallelConfig,
  27. scheduler_config: SchedulerConfig,
  28. device_config: DeviceConfig,
  29. lora_config: Optional[LoRAConfig],
  30. kv_cache_dtype: Optional[str] = "auto",
  31. is_driver_worker: bool = False,
  32. *args,
  33. **kwargs,
  34. ):
  35. self.model_config = model_config
  36. self.parallel_config = parallel_config
  37. self.scheduler_config = scheduler_config
  38. self.lora_config = lora_config
  39. self.is_driver_worker = is_driver_worker
  40. # model_config can be None in tests/samplers/test_sampler.py.
  41. # FIXME: This is a hack to make the tests work. Refactor this.
  42. self.sliding_window = (model_config.get_sliding_window()
  43. if model_config is not None else None)
  44. self.device_config = (device_config
  45. if device_config is not None else DeviceConfig())
  46. self.device = self.device_config.device
  47. self.model = None
  48. self.block_size = None # Set after initial profiling.
  49. self.kv_cache_dtype = kv_cache_dtype
  50. self.attn_backend = get_attn_backend(
  51. self.model_config.dtype if model_config is not None else None)
  52. def load_model(self) -> None:
  53. self.model = get_model(self.model_config,
  54. self.device_config,
  55. lora_config=self.lora_config,
  56. parallel_config=self.parallel_config,
  57. scheduler_config=self.scheduler_config)
  58. def _prepare_prompt(
  59. self,
  60. seq_group_metadata_list: List[SequenceGroupMetadata],
  61. ) -> Tuple[torch.Tensor, torch.Tensor, AttentionMetadata, List[int]]:
  62. assert len(seq_group_metadata_list) > 0
  63. input_tokens: List[int] = []
  64. input_positions: List[int] = []
  65. slot_mapping: List[int] = []
  66. prompt_lens: List[int] = []
  67. for seq_group_metadata in seq_group_metadata_list:
  68. assert seq_group_metadata.is_prompt
  69. seq_ids = list(seq_group_metadata.seq_data.keys())
  70. assert len(seq_ids) == 1
  71. seq_id = seq_ids[0]
  72. seq_data = seq_group_metadata.seq_data[seq_id]
  73. prompt_tokens = seq_data.get_token_ids()
  74. computed_len = seq_data.get_num_computed_tokens()
  75. prompt_len = len(prompt_tokens)
  76. prompt_lens.append(prompt_len) # Prompt token num
  77. input_tokens.extend(prompt_tokens) # Token ids
  78. # Token position ids
  79. # NOTE: Here we assume that the first token in the prompt
  80. # is always the first token in the sequence.
  81. input_positions.extend(list(range(computed_len, prompt_len)))
  82. # Compute the slot mapping.
  83. block_table = seq_group_metadata.block_tables[seq_id]
  84. # Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID,
  85. # where start_idx is max(0, prompt_len - sliding_window).
  86. # For example, if the prompt len is 10, sliding window is 8, and
  87. # block size is 4, the first two tokens are masked and the slot
  88. # mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1].
  89. start_idx = 0
  90. if self.sliding_window is not None:
  91. start_idx = max(0, prompt_len - self.sliding_window)
  92. for i in range(computed_len, prompt_len):
  93. if i < start_idx:
  94. slot_mapping.append(_PAD_SLOT_ID)
  95. continue
  96. block_number = block_table[i //
  97. self.block_size] # type: ignore
  98. block_offset = i % self.block_size # type: ignore
  99. slot = block_number * self.block_size + block_offset
  100. slot_mapping.append(slot)
  101. num_prompt_tokens = len(input_tokens)
  102. input_tokens = torch.tensor(input_tokens,
  103. dtype=torch.long,
  104. device=self.device) # type: ignore
  105. input_positions = torch.tensor(input_positions,
  106. dtype=torch.long,
  107. device=self.device) # type: ignore
  108. slot_mapping = torch.tensor(slot_mapping,
  109. dtype=torch.long,
  110. device=self.device) # type: ignore
  111. attn_metadata = self.attn_backend.make_metadata(
  112. is_prompt=True,
  113. prompt_lens=prompt_lens,
  114. num_prefills=len(prompt_lens),
  115. num_prefill_tokens=num_prompt_tokens,
  116. num_decode_tokens=0,
  117. prefill_metadata=None,
  118. decode_metadata=None,
  119. max_context_len=None,
  120. context_lens=None,
  121. block_tables=torch.tensor([]),
  122. slot_mapping=slot_mapping,
  123. kv_cache_dtype=self.kv_cache_dtype,
  124. )
  125. return (
  126. input_tokens,
  127. input_positions,
  128. attn_metadata,
  129. prompt_lens,
  130. )
  131. def _prepare_decode(
  132. self,
  133. seq_group_metadata_list: List[SequenceGroupMetadata],
  134. ) -> Tuple[torch.Tensor, torch.Tensor, AttentionMetadata]:
  135. assert len(seq_group_metadata_list) > 0
  136. input_tokens: List[int] = []
  137. input_positions: List[int] = []
  138. slot_mapping: List[int] = []
  139. context_lens: List[int] = []
  140. block_tables: List[List[int]] = []
  141. for seq_group_metadata in seq_group_metadata_list:
  142. assert not seq_group_metadata.is_prompt
  143. assert seq_group_metadata.token_chunk_size == 1
  144. seq_ids = list(seq_group_metadata.seq_data.keys())
  145. for seq_id in seq_ids:
  146. seq_data = seq_group_metadata.seq_data[seq_id]
  147. generation_token = seq_data.get_last_token_id()
  148. input_tokens.append(generation_token)
  149. seq_len = seq_data.get_len()
  150. position = seq_len - 1
  151. input_positions.append(position)
  152. context_len = seq_len if self.sliding_window is None else min(
  153. seq_len, self.sliding_window)
  154. context_lens.append(context_len)
  155. block_table = seq_group_metadata.block_tables[seq_id]
  156. block_number = block_table[position // self.block_size]
  157. block_offset = position % self.block_size
  158. slot = block_number * self.block_size + block_offset
  159. slot_mapping.append(slot)
  160. if self.sliding_window is not None:
  161. sliding_window_blocks = (self.sliding_window //
  162. self.block_size)
  163. block_table = block_table[-sliding_window_blocks:]
  164. block_tables.append(block_table)
  165. max_context_len = max(context_lens)
  166. input_tokens = torch.tensor(input_tokens,
  167. dtype=torch.long,
  168. device=self.device)
  169. input_positions = torch.tensor(input_positions,
  170. dtype=torch.long,
  171. device=self.device)
  172. slot_mapping = torch.tensor(slot_mapping,
  173. dtype=torch.long,
  174. device=self.device)
  175. context_lens = torch.tensor(context_lens,
  176. dtype=torch.int,
  177. device=self.device)
  178. max_block_table_len = max(
  179. len(block_table) for block_table in block_tables)
  180. block_tables = make_tensor_with_pad(
  181. block_tables,
  182. max_len=max_block_table_len,
  183. pad=0,
  184. dtype=torch.int,
  185. device=self.device,
  186. )
  187. attn_metadata = self.attn_backend.make_metadata(
  188. is_prompt=False,
  189. slot_mapping=slot_mapping,
  190. prompt_lens=None,
  191. num_prefill_tokens=0,
  192. num_decode_tokens=len(input_tokens),
  193. max_context_len=max_context_len,
  194. num_prefills=0,
  195. prefill_metadata=None,
  196. decode_metadata=None,
  197. context_lens=context_lens,
  198. block_tables=block_tables,
  199. kv_cache_dtype=self.kv_cache_dtype,
  200. )
  201. return (
  202. input_tokens,
  203. input_positions,
  204. attn_metadata,
  205. )
  206. def _prepare_sample(
  207. self,
  208. seq_group_metadata_list: List[SequenceGroupMetadata],
  209. prompt_lens: List[int],
  210. ) -> SamplingMetadata:
  211. seq_groups: List[Tuple[List[int], SamplingParams]] = []
  212. selected_token_indices: List[int] = []
  213. generators: List[torch.Generator] = []
  214. selected_token_start_idx = 0
  215. categorized_sample_indices = {t: [] for t in SamplingType}
  216. categorized_sample_indices_start_idx = 0
  217. categorized_sampled_token_indices_start_idx = 0
  218. for i, seq_group_metadata in enumerate(seq_group_metadata_list):
  219. seq_ids = list(seq_group_metadata.seq_data.keys())
  220. sampling_params = seq_group_metadata.sampling_params
  221. seq_groups.append((seq_ids, sampling_params))
  222. if seq_group_metadata.is_prompt:
  223. assert len(seq_ids) == 1
  224. subquery_len = prompt_lens[i]
  225. if sampling_params.prompt_logprobs is not None:
  226. # NOTE: prompt token positions do not need sample, skip
  227. categorized_sample_indices_start_idx += subquery_len - 1
  228. categorized_sample_indices[
  229. sampling_params.sampling_type].append([
  230. categorized_sample_indices_start_idx,
  231. categorized_sampled_token_indices_start_idx
  232. ])
  233. categorized_sample_indices_start_idx += 1
  234. categorized_sampled_token_indices_start_idx += 1
  235. if sampling_params.prompt_logprobs is not None:
  236. selected_token_indices.extend(
  237. range(selected_token_start_idx,
  238. selected_token_start_idx + subquery_len - 1))
  239. selected_token_indices.append(selected_token_start_idx +
  240. subquery_len - 1)
  241. selected_token_start_idx += subquery_len
  242. if sampling_params.seed is not None:
  243. seq_group_metadata.state.generator = torch.Generator(
  244. device=self.device).manual_seed(sampling_params.seed)
  245. else:
  246. num_seqs = len(seq_ids)
  247. selected_token_indices.extend(
  248. range(selected_token_start_idx,
  249. selected_token_start_idx + num_seqs))
  250. selected_token_start_idx += num_seqs
  251. categorized_sample_indices[
  252. sampling_params.sampling_type].extend(
  253. zip(
  254. range(
  255. categorized_sample_indices_start_idx,
  256. categorized_sample_indices_start_idx +
  257. num_seqs),
  258. range(
  259. categorized_sampled_token_indices_start_idx,
  260. categorized_sampled_token_indices_start_idx +
  261. num_seqs)))
  262. categorized_sample_indices_start_idx += num_seqs
  263. categorized_sampled_token_indices_start_idx += num_seqs
  264. if sampling_params.seed is not None:
  265. generators.append(seq_group_metadata.state.generator)
  266. selected_token_indices = torch.tensor(selected_token_indices,
  267. dtype=torch.long)
  268. categorized_sample_indices = {
  269. t: maybe_expand_dim(torch.tensor(seq_ids, dtype=torch.int), 2, 2)
  270. for t, seq_ids in categorized_sample_indices.items()
  271. }
  272. seq_data: Dict[int, SequenceData] = {}
  273. for seq_group_metadata in seq_group_metadata_list:
  274. seq_data.update(seq_group_metadata.seq_data)
  275. sampling_metadata = SamplingMetadata(
  276. seq_groups=seq_groups,
  277. seq_data=seq_data,
  278. prompt_lens=prompt_lens,
  279. selected_token_indices=selected_token_indices,
  280. categorized_sample_indices=categorized_sample_indices,
  281. generators=generators,
  282. )
  283. return sampling_metadata
  284. def prepare_input_tensors(
  285. self,
  286. seq_group_metadata_list: Optional[List[SequenceGroupMetadata]],
  287. ) -> Tuple[torch.Tensor, torch.Tensor, AttentionMetadata,
  288. SamplingMetadata]:
  289. if self.is_driver_worker:
  290. # NOTE: We assume that all sequences in the group are all prompts or
  291. # all decodes.
  292. is_prompt = seq_group_metadata_list[0].is_prompt
  293. # Prepare input tensors.
  294. if is_prompt:
  295. (input_tokens, input_positions, attn_metadata,
  296. prompt_lens) = self._prepare_prompt(seq_group_metadata_list)
  297. else:
  298. (input_tokens, input_positions,
  299. attn_metadata) = self._prepare_decode(seq_group_metadata_list)
  300. prompt_lens = []
  301. sampling_metadata = self._prepare_sample(seq_group_metadata_list,
  302. prompt_lens)
  303. # Broadcast the metadata.
  304. metadata_dict = {
  305. "input_tokens": input_tokens,
  306. "input_positions": input_positions,
  307. "selected_token_indices":
  308. sampling_metadata.selected_token_indices,
  309. }
  310. metadata_dict.update(attn_metadata.asdict_zerocopy())
  311. broadcast_tensor_dict(metadata_dict, src=0)
  312. else:
  313. metadata_dict = broadcast_tensor_dict(src=0)
  314. input_tokens = metadata_dict.pop("input_tokens")
  315. input_positions = metadata_dict.pop("input_positions")
  316. selected_token_indices = metadata_dict.pop(
  317. "selected_token_indices")
  318. attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
  319. sampling_metadata = SamplingMetadata(
  320. seq_groups=None,
  321. seq_data=None,
  322. prompt_lens=None,
  323. selected_token_indices=selected_token_indices,
  324. categorized_sample_indices=None,
  325. generators=None,
  326. perform_sampling=False,
  327. )
  328. return (
  329. input_tokens,
  330. input_positions,
  331. attn_metadata,
  332. sampling_metadata,
  333. )
  334. @torch.inference_mode()
  335. def execute_model(
  336. self,
  337. seq_group_metadata_list: Optional[List[SequenceGroupMetadata]],
  338. kv_caches: List[torch.Tensor],
  339. ) -> Optional[SamplerOutput]:
  340. (input_tokens, input_positions, attn_metadata, sampling_metadata
  341. ) = self.prepare_input_tensors(seq_group_metadata_list)
  342. model_executable = self.model
  343. execute_model_kwargs = {
  344. "input_ids": input_tokens,
  345. "positions": input_positions,
  346. "kv_caches": kv_caches,
  347. "attn_metadata": attn_metadata,
  348. }
  349. hidden_states = model_executable(**execute_model_kwargs)
  350. # Compute the logits.
  351. logits = self.model.compute_logits(hidden_states, sampling_metadata)
  352. # Only perform sampling in the driver worker.
  353. if not sampling_metadata.perform_sampling:
  354. return None
  355. # Sample the next token.
  356. output = self.model.sample(
  357. logits=logits,
  358. sampling_metadata=sampling_metadata,
  359. )
  360. return output