cpu_model_runner.py 18 KB


  1. from typing import Dict, List, Optional, Tuple
  2. import torch
  3. from torch import nn
  4. from aphrodite.attention import AttentionMetadata, get_attn_backend
  5. from aphrodite.common.config import (DeviceConfig, LoadConfig, LoRAConfig,
  6. ModelConfig, ParallelConfig,
  7. SchedulerConfig, VisionLanguageConfig)
  8. from aphrodite.common.sampling_params import SamplingParams, SamplingType
  9. from aphrodite.common.sequence import (SamplerOutput, SequenceData,
  10. SequenceGroupMetadata)
  11. from aphrodite.common.utils import make_tensor_with_pad, maybe_expand_dim
  12. from aphrodite.distributed import broadcast_tensor_dict
  13. from aphrodite.modeling import SamplingMetadata
  14. from aphrodite.modeling.model_loader import get_model
  15. _PAD_SLOT_ID = -1
  16. class CPUModelRunner:
  17. def __init__(
  18. self,
  19. model_config: ModelConfig,
  20. parallel_config: ParallelConfig,
  21. scheduler_config: SchedulerConfig,
  22. device_config: DeviceConfig,
  23. load_config: LoadConfig,
  24. lora_config: Optional[LoRAConfig],
  25. vision_language_config: Optional[VisionLanguageConfig],
  26. kv_cache_dtype: Optional[str] = "auto",
  27. is_driver_worker: bool = False,
  28. *args,
  29. **kwargs,
  30. ):
  31. self.model_config = model_config
  32. self.parallel_config = parallel_config
  33. self.scheduler_config = scheduler_config
  34. self.lora_config = lora_config
  35. self.vision_language_config = vision_language_config
  36. self.load_config = load_config
  37. self.is_driver_worker = is_driver_worker
  38. # model_config can be None in tests/samplers/test_sampler.py.
  39. # FIXME: This is a hack to make the tests work. Refactor this.
  40. self.sliding_window = (model_config.get_sliding_window()
  41. if model_config is not None else None)
  42. self.device_config = (device_config
  43. if device_config is not None else DeviceConfig())
  44. self.device = self.device_config.device
  45. self.kv_cache_dtype = kv_cache_dtype
  46. self.attn_backend = get_attn_backend(
  47. self.model_config.dtype if model_config is not None else None)
  48. # Lazy initialization.
  49. self.model: nn.Module # Set after init_Model
  50. self.block_size: int # Set after initial profiling.
  51. def load_model(self) -> None:
  52. self.model = get_model(
  53. model_config=self.model_config,
  54. load_config=self.load_config,
  55. device_config=self.device_config,
  56. vision_language_config=self.vision_language_config,
  57. lora_config=self.lora_config,
  58. parallel_config=self.parallel_config,
  59. scheduler_config=self.scheduler_config)
  60. def _prepare_prompt(
  61. self,
  62. seq_group_metadata_list: List[SequenceGroupMetadata],
  63. ) -> Tuple[torch.Tensor, torch.Tensor, AttentionMetadata, List[int],
  64. Optional[torch.Tensor]]:
  65. assert len(seq_group_metadata_list) > 0
  66. input_tokens: List[int] = []
  67. input_positions: List[int] = []
  68. slot_mapping: List[int] = []
  69. prompt_lens: List[int] = []
  70. multi_modal_input_list: List[torch.Tensor] = []
  71. for seq_group_metadata in seq_group_metadata_list:
  72. assert seq_group_metadata.is_prompt
  73. seq_ids = list(seq_group_metadata.seq_data.keys())
  74. assert len(seq_ids) == 1
  75. seq_id = seq_ids[0]
  76. seq_data = seq_group_metadata.seq_data[seq_id]
  77. prompt_tokens = seq_data.get_token_ids()
  78. computed_len = seq_data.get_num_computed_tokens()
  79. prompt_len = len(prompt_tokens)
  80. prompt_lens.append(prompt_len) # Prompt token num
  81. input_tokens.extend(prompt_tokens) # Token ids
  82. # Token position ids
  83. # NOTE: Here we assume that the first token in the prompt
  84. # is always the first token in the sequence.
  85. input_positions.extend(list(range(computed_len, prompt_len)))
  86. if seq_group_metadata.multi_modal_data:
  87. multi_modal_input_list.append(
  88. seq_group_metadata.multi_modal_data.data)
  89. # Compute the slot mapping.
  90. block_table = seq_group_metadata.block_tables[seq_id]
  91. # Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID,
  92. # where start_idx is max(0, prompt_len - sliding_window).
  93. # For example, if the prompt len is 10, sliding window is 8, and
  94. # block size is 4, the first two tokens are masked and the slot
  95. # mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1].
  96. start_idx = 0
  97. if self.sliding_window is not None:
  98. start_idx = max(0, prompt_len - self.sliding_window)
  99. for i in range(computed_len, prompt_len):
  100. if i < start_idx:
  101. slot_mapping.append(_PAD_SLOT_ID)
  102. continue
  103. block_number = block_table[i //
  104. self.block_size] # type: ignore
  105. block_offset = i % self.block_size # type: ignore
  106. slot = block_number * self.block_size + block_offset
  107. slot_mapping.append(slot)
  108. if multi_modal_input_list:
  109. assert self.vision_language_config, (
  110. "Multi-modal inputs are only supported by "
  111. "vision language models.")
  112. multi_modal_input = torch.cat(multi_modal_input_list,
  113. dim=0).to(self.device)
  114. else:
  115. multi_modal_input = None
  116. num_prompt_tokens = len(input_tokens)
  117. input_tokens = torch.tensor(input_tokens,
  118. dtype=torch.long,
  119. device=self.device) # type: ignore
  120. input_positions = torch.tensor(input_positions,
  121. dtype=torch.long,
  122. device=self.device) # type: ignore
  123. slot_mapping = torch.tensor(slot_mapping,
  124. dtype=torch.long,
  125. device=self.device) # type: ignore
  126. attn_metadata = self.attn_backend.make_metadata(
  127. is_prompt=True,
  128. prompt_lens=prompt_lens,
  129. num_prefills=len(prompt_lens),
  130. num_prefill_tokens=num_prompt_tokens,
  131. num_decode_tokens=0,
  132. prefill_metadata=None,
  133. decode_metadata=None,
  134. max_context_len=None,
  135. context_lens=None,
  136. block_tables=torch.tensor([]),
  137. slot_mapping=slot_mapping,
  138. kv_cache_dtype=self.kv_cache_dtype,
  139. )
  140. return (input_tokens, input_positions, attn_metadata, prompt_lens,
  141. multi_modal_input)
  142. def _prepare_decode(
  143. self,
  144. seq_group_metadata_list: List[SequenceGroupMetadata],
  145. ) -> Tuple[torch.Tensor, torch.Tensor, AttentionMetadata]:
  146. assert len(seq_group_metadata_list) > 0
  147. input_tokens: List[int] = []
  148. input_positions: List[int] = []
  149. slot_mapping: List[int] = []
  150. context_lens: List[int] = []
  151. block_tables: List[List[int]] = []
  152. for seq_group_metadata in seq_group_metadata_list:
  153. assert not seq_group_metadata.is_prompt
  154. assert seq_group_metadata.token_chunk_size == 1
  155. seq_ids = list(seq_group_metadata.seq_data.keys())
  156. for seq_id in seq_ids:
  157. seq_data = seq_group_metadata.seq_data[seq_id]
  158. generation_token = seq_data.get_last_token_id()
  159. input_tokens.append(generation_token)
  160. seq_len = seq_data.get_len()
  161. position = seq_len - 1
  162. input_positions.append(position)
  163. context_len = seq_len if self.sliding_window is None else min(
  164. seq_len, self.sliding_window)
  165. context_lens.append(context_len)
  166. block_table = seq_group_metadata.block_tables[seq_id]
  167. block_number = block_table[position // self.block_size]
  168. block_offset = position % self.block_size
  169. slot = block_number * self.block_size + block_offset
  170. slot_mapping.append(slot)
  171. if self.sliding_window is not None:
  172. sliding_window_blocks = (self.sliding_window //
  173. self.block_size)
  174. block_table = block_table[-sliding_window_blocks:]
  175. block_tables.append(block_table)
  176. max_context_len = max(context_lens)
  177. input_tokens = torch.tensor(input_tokens,
  178. dtype=torch.long,
  179. device=self.device)
  180. input_positions = torch.tensor(input_positions,
  181. dtype=torch.long,
  182. device=self.device)
  183. slot_mapping = torch.tensor(slot_mapping,
  184. dtype=torch.long,
  185. device=self.device)
  186. context_lens = torch.tensor(context_lens,
  187. dtype=torch.int,
  188. device=self.device)
  189. max_block_table_len = max(
  190. len(block_table) for block_table in block_tables)
  191. block_tables = make_tensor_with_pad(
  192. block_tables,
  193. max_len=max_block_table_len,
  194. pad=0,
  195. dtype=torch.int,
  196. device=self.device,
  197. )
  198. attn_metadata = self.attn_backend.make_metadata(
  199. is_prompt=False,
  200. slot_mapping=slot_mapping,
  201. prompt_lens=None,
  202. num_prefill_tokens=0,
  203. num_decode_tokens=len(input_tokens),
  204. max_context_len=max_context_len,
  205. num_prefills=0,
  206. prefill_metadata=None,
  207. decode_metadata=None,
  208. context_lens=context_lens,
  209. block_tables=block_tables,
  210. kv_cache_dtype=self.kv_cache_dtype,
  211. )
  212. return (
  213. input_tokens,
  214. input_positions,
  215. attn_metadata,
  216. )
  217. def _prepare_sample(
  218. self,
  219. seq_group_metadata_list: List[SequenceGroupMetadata],
  220. prompt_lens: List[int],
  221. ) -> SamplingMetadata:
  222. seq_groups: List[Tuple[List[int], SamplingParams]] = []
  223. selected_token_indices: List[int] = []
  224. generators: List[torch.Generator] = []
  225. selected_token_start_idx = 0
  226. categorized_sample_indices: Dict[SamplingType,
  227. List[Tuple[int, int]]] = {
  228. t: []
  229. for t in SamplingType
  230. }
  231. categorized_sample_indices_start_idx = 0
  232. categorized_sampled_token_indices_start_idx = 0
  233. for i, seq_group_metadata in enumerate(seq_group_metadata_list):
  234. seq_ids = list(seq_group_metadata.seq_data.keys())
  235. sampling_params = seq_group_metadata.sampling_params
  236. seq_groups.append((seq_ids, sampling_params))
  237. if seq_group_metadata.is_prompt:
  238. assert len(seq_ids) == 1
  239. subquery_len = prompt_lens[i]
  240. if sampling_params.prompt_logprobs is not None:
  241. # NOTE: prompt token positions do not need sample, skip
  242. categorized_sample_indices_start_idx += subquery_len - 1
  243. categorized_sample_indices[
  244. sampling_params.sampling_type].append(
  245. (categorized_sample_indices_start_idx,
  246. categorized_sampled_token_indices_start_idx))
  247. categorized_sample_indices_start_idx += 1
  248. categorized_sampled_token_indices_start_idx += 1
  249. if sampling_params.prompt_logprobs is not None:
  250. selected_token_indices.extend(
  251. range(selected_token_start_idx,
  252. selected_token_start_idx + subquery_len - 1))
  253. selected_token_indices.append(selected_token_start_idx +
  254. subquery_len - 1)
  255. selected_token_start_idx += subquery_len
  256. if sampling_params.seed is not None:
  257. seq_group_metadata.state.generator = torch.Generator(
  258. device=self.device).manual_seed(sampling_params.seed)
  259. else:
  260. num_seqs = len(seq_ids)
  261. selected_token_indices.extend(
  262. range(selected_token_start_idx,
  263. selected_token_start_idx + num_seqs))
  264. selected_token_start_idx += num_seqs
  265. categorized_sample_indices[
  266. sampling_params.sampling_type].extend(
  267. zip(
  268. range(
  269. categorized_sample_indices_start_idx,
  270. categorized_sample_indices_start_idx +
  271. num_seqs),
  272. range(
  273. categorized_sampled_token_indices_start_idx,
  274. categorized_sampled_token_indices_start_idx +
  275. num_seqs)))
  276. categorized_sample_indices_start_idx += num_seqs
  277. categorized_sampled_token_indices_start_idx += num_seqs
  278. if sampling_params.seed is not None:
  279. generators.append(seq_group_metadata.state.generator)
  280. selected_token_indices = torch.tensor(selected_token_indices,
  281. dtype=torch.long)
  282. categorized_sample_indices = {
  283. t: maybe_expand_dim(torch.tensor(seq_ids, dtype=torch.int), 2, 2)
  284. for t, seq_ids in categorized_sample_indices.items()
  285. }
  286. seq_data: Dict[int, SequenceData] = {}
  287. for seq_group_metadata in seq_group_metadata_list:
  288. seq_data.update(seq_group_metadata.seq_data)
  289. sampling_metadata = SamplingMetadata(
  290. seq_groups=seq_groups,
  291. seq_data=seq_data,
  292. prompt_lens=prompt_lens,
  293. selected_token_indices=selected_token_indices,
  294. categorized_sample_indices=categorized_sample_indices,
  295. generators=generators,
  296. )
  297. return sampling_metadata
  298. def prepare_input_tensors(
  299. self,
  300. seq_group_metadata_list: List[SequenceGroupMetadata],
  301. ) -> Tuple[torch.Tensor, torch.Tensor, AttentionMetadata, SamplingMetadata,
  302. Optional[torch.Tensor]]:
  303. multi_modal_input = None
  304. if self.is_driver_worker:
  305. # NOTE: We assume that all sequences in the group are all prompts or
  306. # all decodes.
  307. is_prompt = seq_group_metadata_list[0].is_prompt
  308. # Prepare input tensors.
  309. if is_prompt:
  310. (input_tokens, input_positions, attn_metadata, prompt_lens,
  311. multi_modal_input
  312. ) = self._prepare_prompt(seq_group_metadata_list)
  313. else:
  314. (input_tokens, input_positions,
  315. attn_metadata) = self._prepare_decode(seq_group_metadata_list)
  316. prompt_lens = []
  317. sampling_metadata = self._prepare_sample(seq_group_metadata_list,
  318. prompt_lens)
  319. # Broadcast the metadata.
  320. metadata_dict = {
  321. "input_tokens": input_tokens,
  322. "input_positions": input_positions,
  323. "selected_token_indices":
  324. sampling_metadata.selected_token_indices,
  325. }
  326. metadata_dict.update(attn_metadata.asdict_zerocopy())
  327. broadcast_tensor_dict(metadata_dict, src=0)
  328. else:
  329. metadata_dict = broadcast_tensor_dict(src=0)
  330. input_tokens = metadata_dict.pop("input_tokens")
  331. input_positions = metadata_dict.pop("input_positions")
  332. selected_token_indices = metadata_dict.pop(
  333. "selected_token_indices")
  334. attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
  335. sampling_metadata = SamplingMetadata(
  336. seq_groups=None,
  337. seq_data=None,
  338. prompt_lens=None,
  339. selected_token_indices=selected_token_indices,
  340. categorized_sample_indices=None,
  341. generators=None,
  342. perform_sampling=False,
  343. )
  344. return (input_tokens, input_positions, attn_metadata,
  345. sampling_metadata, multi_modal_input)
  346. @torch.inference_mode()
  347. def execute_model(
  348. self,
  349. seq_group_metadata_list: List[SequenceGroupMetadata],
  350. kv_caches: List[torch.Tensor],
  351. ) -> Optional[SamplerOutput]:
  352. (input_tokens, input_positions, attn_metadata, sampling_metadata,
  353. multi_modal_input
  354. ) = self.prepare_input_tensors(seq_group_metadata_list)
  355. model_executable = self.model
  356. execute_model_kwargs = {
  357. "input_ids": input_tokens,
  358. "positions": input_positions,
  359. "kv_caches": kv_caches,
  360. "attn_metadata": attn_metadata,
  361. }
  362. if self.vision_language_config:
  363. execute_model_kwargs.update({"image_input": multi_modal_input})
  364. hidden_states = model_executable(**execute_model_kwargs)
  365. # Compute the logits.
  366. logits = self.model.compute_logits(hidden_states, sampling_metadata)
  367. # Only perform sampling in the driver worker.
  368. if not sampling_metadata.perform_sampling:
  369. return None
  370. # Sample the next token.
  371. output = self.model.sample(
  372. logits=logits,
  373. sampling_metadata=sampling_metadata,
  374. )
  375. return output