model_runner_base.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. import dataclasses
  2. import pickle
  3. from abc import ABC, abstractmethod
  4. from datetime import datetime
  5. from functools import wraps
  6. from typing import (TYPE_CHECKING, Any, Dict, Generic, Iterable, List,
  7. Optional, Type, TypeVar)
  8. import torch
  9. from loguru import logger
  10. from torch import is_tensor
  11. from aphrodite.common.sequence import (IntermediateTensors,
  12. SequenceGroupMetadata)
  13. from aphrodite.modeling.layers.sampler import SamplerOutput
  14. from aphrodite.platforms import current_platform
  15. if TYPE_CHECKING:
  16. from aphrodite.attention import AttentionMetadata
  17. from aphrodite.attention.backends.abstract import AttentionBackend
  18. from aphrodite.modeling.sampling_metadata import SamplingMetadata
  19. T = TypeVar('T', bound="BroadcastableModelInput")
  20. def _add_attn_metadata_broadcastable_dict(
  21. tensor_dict: Dict[str, Any],
  22. attn_metadata: Optional["AttentionMetadata"]) -> None:
  23. """
  24. Helper method to update tensor_dict with broadcastable
  25. AttentionMetadata fields.
  26. """
  27. if attn_metadata is not None:
  28. tensor_dict.update(attn_metadata.asdict_zerocopy())
  29. def _init_attn_metadata_from_tensor_dict(
  30. attn_backend: "AttentionBackend",
  31. tensor_dict: Dict[str, Any],
  32. ) -> Dict[str, Any]:
  33. """
  34. Helper method to initialize AttentionMetadata based on an
  35. AttentionBackend and broadcastable AttentionMetadata fields.
  36. """
  37. # Extract the fields used to create AttentionMetadata.
  38. valid_attn_kwargs = {}
  39. for field in dataclasses.fields(attn_backend.get_metadata_cls()):
  40. val = tensor_dict.pop(field.name, None)
  41. if val is not None:
  42. valid_attn_kwargs[field.name] = val
  43. attn_metadata = attn_backend.make_metadata(**valid_attn_kwargs)
  44. tensor_dict["attn_metadata"] = attn_metadata
  45. return tensor_dict
  46. def _init_sampling_metadata_from_tensor_dict( # type: ignore
  47. tensor_dict: Dict[str, Any]) -> Dict[str, Any]:
  48. """
  49. Helper method to initialize SamplingMetadata based on broadcastable
  50. SamplingMetadata fields.
  51. """
  52. from aphrodite.modeling.sampling_metadata import SamplingMetadata
  53. selected_token_indices = tensor_dict.pop("selected_token_indices", None)
  54. # An empty SamplingMetadata to signal that the worker should skip
  55. # sampling.
  56. if selected_token_indices is not None:
  57. tensor_dict["sampling_metadata"] = SamplingMetadata(
  58. seq_groups=None,
  59. selected_token_indices=selected_token_indices,
  60. categorized_sample_indices=None,
  61. num_prompts=0,
  62. )
  63. return tensor_dict
  64. def _add_sampling_metadata_broadcastable_dict(
  65. tensor_dict: Dict[str, Any],
  66. sampling_metadata: Optional["SamplingMetadata"]) -> None:
  67. """
  68. Helper method to update tensor_dict with broadcastable
  69. SamplingMetadata fields.
  70. """
  71. if sampling_metadata is not None:
  72. tensor_dict["selected_token_indices"] = (
  73. sampling_metadata.selected_token_indices)
  74. def _init_frozen_model_input_from_tensor_dict(
  75. frozen_model_input_cls: Type["ModelRunnerInputBase"],
  76. tensor_dict: Dict[str, Any]) -> Dict[str, Any]:
  77. """
  78. Helper method to initialize a frozen ModelInput based on broadcastable
  79. """
  80. valid_tensor_kwargs = {}
  81. for field in dataclasses.fields(frozen_model_input_cls):
  82. val = tensor_dict.pop(field.name, None)
  83. if val is not None:
  84. valid_tensor_kwargs[field.name] = val
  85. frozen_model_input = frozen_model_input_cls(**valid_tensor_kwargs)
  86. tensor_dict["frozen_model_input"] = frozen_model_input
  87. return tensor_dict
  88. def dump_input_when_exception(exclude_args: Optional[List[int]] = None,
  89. exclude_kwargs: Optional[List[str]] = None):
  90. def _inner(func):
  91. @wraps(func)
  92. def _wrapper(*args, **kwargs):
  93. try:
  94. return func(*args, **kwargs)
  95. except Exception as err:
  96. timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
  97. filename = f"/tmp/err_{func.__name__}_input_{timestamp}.pkl"
  98. logger.info("Writing input of failed execution to "
  99. f"{filename}...")
  100. with open(filename, "wb") as filep:
  101. dumped_inputs = {
  102. k: v
  103. for k, v in kwargs.items()
  104. if k not in (exclude_kwargs or [])
  105. }
  106. for i, arg in enumerate(args):
  107. if i not in (exclude_args or []):
  108. dumped_inputs[f"arg_{i}"] = arg
  109. # Only persist dtype and shape for kvcache tensors
  110. # (can be way too big otherwise)
  111. if (kv_caches := dumped_inputs.get("kv_caches")) \
  112. and isinstance(kv_caches, Iterable):
  113. dumped_inputs["kv_caches"] = [(t.dtype, t.shape)
  114. for t in kv_caches
  115. if is_tensor(t)]
  116. pickle.dump(dumped_inputs, filep)
  117. logger.info(
  118. f"Completed writing input of failed execution to "
  119. f"{filename}.")
  120. raise type(err)(
  121. f"Error in model execution (input dumped to {filename}): "
  122. f"{str(err)}") from err
  123. return _wrapper
  124. return _inner
  125. class BroadcastableModelInput(ABC):
  126. @abstractmethod
  127. def as_broadcastable_tensor_dict(self) -> Dict[str, Any]:
  128. """
  129. Extract broadcastable fields. Override for fields that require some
  130. custom deserialization.
  131. """
  132. raise NotImplementedError
  133. @classmethod
  134. @abstractmethod
  135. def from_broadcasted_tensor_dict(
  136. cls: Type[T],
  137. tensor_dict: Dict[str, Any],
  138. attn_backend: Optional["AttentionBackend"] = None,
  139. ) -> T:
  140. """
  141. Pop fields from the given tensor_dict and populate a new instance of
  142. BroadcastableModelInput.
  143. """
  144. raise NotImplementedError
  145. @dataclasses.dataclass(frozen=True)
  146. class ModelRunnerInputBase(BroadcastableModelInput):
  147. """Local inputs to each worker's model runner. May contain
  148. device-specific data. Different worker backends may have different methods
  149. of converting from the global ExecuteModelRequest produced by the LLM
  150. engine to the worker-local ModelRunnerInputBase objects.
  151. Model runners that support multi-GPU execution should define a
  152. ModelRunnerInputBase subclass, add their required fields, and specify how to
  153. serialize/deserialize a ModelInput for broadcast between workers.
  154. """
  155. pass
  156. class ModelRunnerInputBuilderBase(ABC, Generic[T]):
  157. """A builder to create ModelRunnerInputBase objects.
  158. """
  159. @abstractmethod
  160. def add_seq_group(self, seq_group_metadata):
  161. """TBA"""
  162. raise NotImplementedError
  163. @abstractmethod
  164. def build(self, *args, **kwargs) -> T:
  165. """Build metadata with on-device tensors."""
  166. raise NotImplementedError
  167. class ModelRunnerBase(ABC, Generic[T]):
  168. """
  169. Model runner interface that abstracts a particular hardware and/or type of
  170. model. Model execution may communicate data with model runners in other
  171. processes, but it should not include control plane metadata communication.
  172. Each ModelRunnerBase subclass should define a corresponding
  173. ModelRunnerInputBase subclass.
  174. """
  175. # Map of request_id -> generator used for seeded random sampling
  176. generators: Dict[str, torch.Generator] = {}
  177. @abstractmethod
  178. def make_model_input_from_broadcasted_tensor_dict(
  179. self,
  180. tensor_dict: Dict[str, Any],
  181. ) -> T:
  182. """
  183. Make an instance of a ModelRunnerInputBase from the broadcasted tensor
  184. dict.
  185. """
  186. raise NotImplementedError
  187. @abstractmethod
  188. def prepare_model_input(
  189. self,
  190. seq_group_metadata_list: List[SequenceGroupMetadata],
  191. virtual_engine: int = 0,
  192. finished_requests_ids: Optional[List[str]] = None,
  193. ) -> T:
  194. """
  195. Prepare the inputs to ModelRunnerBase.execute_model from an execution
  196. request. This method may move data to the worker's local device. It is
  197. not allowed to communicate with other workers or devices.
  198. """
  199. raise NotImplementedError
  200. @current_platform.inference_mode()
  201. def execute_model(
  202. self,
  203. model_input: T,
  204. kv_caches: Optional[List[torch.Tensor]],
  205. intermediate_tensors: Optional[IntermediateTensors],
  206. num_steps: int = 1,
  207. ) -> Optional[List[SamplerOutput]]:
  208. """
  209. Execute the model on the given input.
  210. """
  211. raise NotImplementedError
  212. def get_generators(self, finished_request_ids: Optional[List[str]] = None):
  213. """
  214. Return dict of per-request generators used for random sampling.
  215. """
  216. # Clean up generators from completed requests
  217. if finished_request_ids:
  218. for request_id in finished_request_ids:
  219. self.generators.pop(request_id, None)
  220. return self.generators