abstract.py 6.6 KB


  1. from abc import ABC, abstractmethod
  2. from contextlib import contextmanager
  3. from dataclasses import dataclass, fields
  4. from enum import Enum, auto
  5. from typing import (TYPE_CHECKING, Any, Dict, Generic, List, Optional, Set,
  6. Tuple, Type, TypeVar)
  7. import torch
  8. if TYPE_CHECKING:
  9. from aphrodite.task_handler.model_runner_base import (
  10. ModelRunnerBase, ModelRunnerInputBase, ModelRunnerInputBuilderBase)
  11. class AttentionType(Enum):
  12. DECODER = auto() # Decoder attention between previous layer Q/K/V
  13. ENCODER = auto() # Encoder attention between previous layer Q/K/V
  14. ENCODER_DECODER = auto() # Attention between dec. Q and enc. K/V
  15. class AttentionBackend(ABC):
  16. """Abstract class for attention backends."""
  17. @staticmethod
  18. @abstractmethod
  19. def get_name() -> str:
  20. raise NotImplementedError
  21. @staticmethod
  22. @abstractmethod
  23. def get_impl_cls() -> Type["AttentionImpl"]:
  24. raise NotImplementedError
  25. @staticmethod
  26. @abstractmethod
  27. def get_metadata_cls() -> Type["AttentionMetadata"]:
  28. raise NotImplementedError
  29. @staticmethod
  30. def get_state_cls() -> Type["AttentionState"]:
  31. raise NotImplementedError
  32. @classmethod
  33. def make_metadata(cls, *args, **kwargs) -> "AttentionMetadata":
  34. return cls.get_metadata_cls()(*args, **kwargs)
  35. @staticmethod
  36. @abstractmethod
  37. def get_builder_cls() -> Type["AttentionMetadataBuilder"]:
  38. raise NotImplementedError
  39. @classmethod
  40. def make_metadata_builder(cls, *args,
  41. **kwargs) -> "AttentionMetadataBuilder":
  42. return cls.get_builder_cls()(*args, **kwargs)
  43. @staticmethod
  44. @abstractmethod
  45. def get_kv_cache_shape(
  46. num_blocks: int,
  47. block_size: int,
  48. num_kv_heads: int,
  49. head_size: int,
  50. ) -> Tuple[int, ...]:
  51. raise NotImplementedError
  52. @staticmethod
  53. @abstractmethod
  54. def swap_blocks(
  55. src_kv_cache: torch.Tensor,
  56. dst_kv_cache: torch.Tensor,
  57. src_to_dst: torch.Tensor,
  58. ) -> None:
  59. raise NotImplementedError
  60. @staticmethod
  61. @abstractmethod
  62. def copy_blocks(
  63. kv_caches: List[torch.Tensor],
  64. src_to_dists: torch.Tensor,
  65. ) -> None:
  66. raise NotImplementedError
  67. def advance_step(self, num_seqs: int, num_queries: int):
  68. raise NotImplementedError
  69. @dataclass
  70. class AttentionMetadata:
  71. """Attention metadata for prefill and decode batched together."""
  72. # Total number of prefill requests.
  73. num_prefills: int
  74. # Number of prefill tokens.
  75. num_prefill_tokens: int
  76. # Number of decode tokens. Note that it is equivalent to the number of
  77. # decode requests.
  78. num_decode_tokens: int
  79. # (num_tokens,). The indices of the token slots that input tokens will be
  80. # stored into. E.g., if `slot_mapping` is [35, 2, 17] and the block size
  81. # is 16, the three tokens are stored in the 3rd slot in block 2, 2nd slot
  82. # in block 0, and 1st slot in block 1, respectively.
  83. slot_mapping: torch.Tensor
  84. @property
  85. @abstractmethod
  86. def prefill_metadata(self) -> Optional["AttentionMetadata"]:
  87. """Return the attention metadata that's required to run prefill
  88. attention."""
  89. pass
  90. @property
  91. @abstractmethod
  92. def decode_metadata(self) -> Optional["AttentionMetadata"]:
  93. """Return the attention metadata that's required to run decode
  94. attention."""
  95. pass
  96. def asdict_zerocopy(self,
  97. skip_fields: Optional[Set[str]] = None
  98. ) -> Dict[str, Any]:
  99. """Similar to dataclasses.asdict, but avoids deepcopying."""
  100. if skip_fields is None:
  101. skip_fields = set()
  102. # Note that if we add dataclasses as fields, they will need
  103. # similar handling.
  104. return {
  105. field.name: getattr(self, field.name)
  106. for field in fields(self) if field.name not in skip_fields
  107. }
  108. T = TypeVar("T", bound=AttentionMetadata)
  109. class AttentionState(ABC, Generic[T]):
  110. """Holds attention backend specific objects reused during the
  111. lifetime of the model runner.
  112. """
  113. @abstractmethod
  114. def __init__(self, runner: "ModelRunnerBase"):
  115. ...
  116. @abstractmethod
  117. @contextmanager
  118. def graph_capture(self, max_batch_size: int):
  119. """Context manager used when capturing a CUDA graph."""
  120. yield
  121. @abstractmethod
  122. def graph_clone(self, batch_size: int) -> "AttentionState[T]":
  123. """Clone attention state to save in CUDA graph metadata."""
  124. ...
  125. @abstractmethod
  126. def graph_capture_get_metadata_for_batch(self, batch_size: int) -> T:
  127. """Get attention metadata for CUDA graph capture of batch_size."""
  128. ...
  129. @abstractmethod
  130. def get_graph_input_buffers(self, attn_metadata: T) -> Dict[str, Any]:
  131. """Get attention-specific input buffers for CUDA graph capture."""
  132. ...
  133. @abstractmethod
  134. def prepare_graph_input_buffers(self, input_buffers: Dict[str, Any],
  135. attn_metadata: T) -> None:
  136. """In-place modify input buffers dict for CUDA graph replay."""
  137. ...
  138. @abstractmethod
  139. def begin_forward(self, model_input: "ModelRunnerInputBase") -> None:
  140. """Prepare state for forward pass."""
  141. ...
  142. class AttentionMetadataBuilder(ABC, Generic[T]):
  143. """Abstract class for attention metadata builders."""
  144. @abstractmethod
  145. def __init__(self, input_builder: "ModelRunnerInputBuilderBase") -> None:
  146. raise NotImplementedError
  147. @abstractmethod
  148. def build(self, seq_lens: List[int], query_lens: List[int],
  149. cuda_graph_pad_size: int, batch_size: int) -> T:
  150. """Build attention metadata with on-device tensors."""
  151. raise NotImplementedError
  152. class AttentionImpl(ABC, Generic[T]):
  153. @abstractmethod
  154. def __init__(
  155. self,
  156. num_heads: int,
  157. head_size: int,
  158. scale: float,
  159. num_kv_heads: Optional[int] = None,
  160. alibi_slopes: Optional[List[float]] = None,
  161. sliding_window: Optional[int] = None,
  162. kv_cache_dtype: str = "auto",
  163. blocksparse_params: Optional[Dict[str, Any]] = None,
  164. logits_soft_cap: Optional[float] = None,
  165. ) -> None:
  166. raise NotImplementedError
  167. @abstractmethod
  168. def forward(
  169. self,
  170. query: torch.Tensor,
  171. key: torch.Tensor,
  172. value: torch.Tensor,
  173. kv_cache: torch.Tensor,
  174. attn_metadata: T,
  175. k_scale: float = 1.0,
  176. v_scale: float = 1.0,
  177. attn_type: AttentionType = AttentionType.DECODER,
  178. ) -> torch.Tensor:
  179. raise NotImplementedError