123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- from dataclasses import dataclass
- from typing import Any, Dict, List, Optional, Tuple, Type
- import torch
- import torch_xla.experimental.custom_kernel # Required to register custom ops.
- from aphrodite.attention.backends.abstract import (AttentionBackend,
- AttentionImpl,
- AttentionMetadata,
- AttentionType)
- from aphrodite.attention.backends.utils import CommonAttentionState
- class PallasAttentionBackend(AttentionBackend):
- @staticmethod
- def get_impl_cls() -> Type["PallasAttentionBackendImpl"]:
- return PallasAttentionBackendImpl
- @staticmethod
- def get_metadata_cls() -> Type["PallasMetadata"]:
- return PallasMetadata
- @staticmethod
- def get_state_cls() -> Type["CommonAttentionState"]:
- return CommonAttentionState
- @staticmethod
- def get_kv_cache_shape(
- num_blocks: int,
- block_size: int,
- num_kv_heads: int,
- head_size: int,
- ) -> Tuple[int, ...]:
- return (num_kv_heads, num_blocks, block_size, head_size)
- @staticmethod
- def swap_blocks(
- src_kv_cache: torch.Tensor,
- dst_kv_cache: torch.Tensor,
- src_to_dst: torch.Tensor,
- ) -> None:
- raise RuntimeError("swap_blocks is not used for the TPU backend.")
- @torch.compile(backend="openxla")
- @staticmethod
- def copy_blocks(
- kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
- src_to_dists: Tuple[torch.Tensor, torch.Tensor],
- ) -> None:
- src_indices, dst_indices = src_to_dists
- for k_cache, v_cache in kv_caches:
- torch.ops.xla.dynamo_set_buffer_donor_(k_cache, True)
- k_cache[:, dst_indices] = k_cache[:, src_indices]
- torch.ops.xla.dynamo_set_buffer_donor_(v_cache, True)
- v_cache[:, dst_indices] = v_cache[:, src_indices]
- @dataclass
- class PallasMetadata(AttentionMetadata):
- # Currently, input sequences can only contain all prefills
- # or all decoding.
- block_tables: Optional[torch.Tensor] = None
- context_lens: Optional[torch.Tensor] = None
- @property
- def prefill_metadata(self) -> Optional["PallasMetadata"]:
- if self.num_prefills == 0:
- return None
- assert self.num_decode_tokens == 0
- assert self.block_tables is None
- assert self.context_lens is None
- return self
- @property
- def decode_metadata(self) -> Optional["PallasMetadata"]:
- if self.num_decode_tokens == 0:
- return None
- assert self.num_prefills == 0
- assert self.num_prefill_tokens == 0
- assert self.block_tables is not None
- assert self.context_lens is not None
- return self
- class PallasAttentionBackendImpl(AttentionImpl):
- def __init__(
- self,
- num_heads: int,
- head_size: int,
- scale: float,
- num_kv_heads: int,
- alibi_slopes: Optional[List[float]],
- sliding_window: Optional[int],
- kv_cache_dtype: str,
- blocksparse_params: Optional[Dict[str, Any]] = None,
- logits_soft_cap: Optional[float] = None,
- ) -> None:
- self.num_heads = num_heads
- self.head_size = head_size
- self.scale = float(scale)
- self.num_kv_heads = num_heads if num_kv_heads is None else num_kv_heads
- assert self.num_heads % self.num_kv_heads == 0
- self.num_queries_per_kv = self.num_heads // self.num_kv_heads
- if head_size % 128 != 0:
- raise NotImplementedError("Head size must be a multiple of 128.")
- if alibi_slopes is not None:
- raise NotImplementedError("Alibi slopes is not supported.")
- if sliding_window is not None:
- raise NotImplementedError("Sliding window is not supported.")
- if kv_cache_dtype != "auto":
- raise NotImplementedError("FP8 KV cache dtype is not supported.")
- if blocksparse_params is not None:
- raise NotImplementedError("Blocksparse is not supported.")
- if logits_soft_cap is not None:
- raise NotImplementedError(
- "Attention logits soft-capping is not supported.")
- if torch_xla.tpu.version() < 4:
- raise NotImplementedError("TPU version must be 4 or higher.")
- self.megacore_mode = None
- tpu_env = torch_xla.tpu.get_tpu_env()
- tpu_type = (tpu_env.get("ACCELERATOR_TYPE", None)
- or tpu_env.get("TYPE", None)
- or tpu_env.get("TPU_ACCELERATOR_TYPE", None))
- assert tpu_type is not None
- tpu_type = tpu_type.lower()
- if "lite" not in tpu_type:
- if self.num_kv_heads % 2 == 0:
- self.megacore_mode = "kv_head"
- else:
- # NOTE: If the batch size is not a multiple of 2, the
- # megacore mode will be None.
- self.megacore_mode = "batch"
- def forward(
- self,
- query: torch.Tensor,
- key: torch.Tensor,
- value: torch.Tensor,
- kv_cache: Tuple[Optional[torch.Tensor], Optional[torch.Tensor]],
- attn_metadata: PallasMetadata,
- k_scale: float = 1.0,
- v_scale: float = 1.0,
- attn_type: AttentionType = AttentionType.DECODER,
- ) -> torch.Tensor:
- """Forward pass with Pallas attention.
- Args:
- query: shape = [batch_size, seq_len, num_heads * head_size]
- key: shape = [batch_size, seq_len, num_kv_heads * head_size]
- value: shape = [batch_size, seq_len, num_kv_heads * head_size]
- key_cache = [num_kv_heads, num_blocks, block_size, head_size]
- value_cache = [num_kv_heads, num_blocks, block_size, head_size]
- attn_metadata: Metadata for attention.
- Returns:
- shape = [batch_size, seq_len, num_heads * head_size]
- """
- assert k_scale == 1.0 and v_scale == 1.0
- if attn_type != AttentionType.DECODER:
- raise NotImplementedError("Encoder self-attention and "
- "encoder/decoder cross-attention "
- "are not implemented for "
- "PallasAttentionBackendImpl")
- batch_size, seq_len, hidden_size = query.shape
- query = query.view(batch_size, seq_len, self.num_heads, self.head_size)
- key = key.view(batch_size, seq_len, self.num_kv_heads, self.head_size)
- value = value.view(batch_size, seq_len, self.num_kv_heads,
- self.head_size)
- if kv_cache[0] is not None:
- slot_mapping = attn_metadata.slot_mapping
- key_cache, value_cache = kv_cache
- write_to_kv_cache(key, value, key_cache, value_cache, slot_mapping)
- query = query * self.scale
- if attn_metadata.num_prefills > 0:
- assert seq_len % 16 == 0, (
- "Pallas FlashAttention kernel requires seq_len to be a "
- f"multiple of 16 but got {seq_len}")
- # Handle GQA/MQA.
- if self.num_kv_heads != self.num_heads:
- key = key.repeat_interleave(self.num_queries_per_kv, dim=-2)
- key = key.view(batch_size, seq_len, self.num_heads,
- self.head_size)
- value = value.repeat_interleave(self.num_queries_per_kv,
- dim=-2)
- value = value.view(batch_size, seq_len, self.num_heads,
- self.head_size)
- # FlashAttention requires [batch_size, num_heads, seq_len, d_model]
- # while the input is [batch_size, seq_len, num_heads, d_model].
- # Permute the input to match the required format.
- output = torch.ops.xla.flash_attention(
- query.permute(0, 2, 1, 3),
- key.permute(0, 2, 1, 3),
- value.permute(0, 2, 1, 3),
- True,
- )
- output = output.permute(0, 2, 1, 3)
- else:
- # Decoding run.
- assert kv_cache is not None
- pages_per_compute_block = 16 # TODO: Tune this value.
- if self.megacore_mode == "batch" and batch_size % 2 != 0:
- megacore_mode = None
- else:
- megacore_mode = self.megacore_mode
- # NOTE: A temporary workaround to avoid the error:
- # "xla::paged_attention() Expected a value of type 'str' for
- # argument 'megacore_mode' but instead found type 'NoneType'."
- if megacore_mode is not None:
- output = torch.ops.xla.paged_attention(
- query.squeeze(dim=1),
- key_cache,
- value_cache,
- attn_metadata.context_lens,
- attn_metadata.block_tables,
- pages_per_compute_block,
- megacore_mode=megacore_mode,
- )
- else:
- output = torch.ops.xla.paged_attention(
- query.squeeze(dim=1),
- key_cache,
- value_cache,
- attn_metadata.context_lens,
- attn_metadata.block_tables,
- pages_per_compute_block,
- )
- # Reshape the output tensor.
- return output.reshape(batch_size, seq_len, hidden_size)
- def write_to_kv_cache(
- key: torch.Tensor,
- value: torch.Tensor,
- key_cache: torch.Tensor,
- value_cache: torch.Tensor,
- slot_mapping: torch.Tensor,
- ) -> None:
- torch.ops.xla.dynamo_set_buffer_donor_(key_cache, True)
- torch.ops.xla.dynamo_set_buffer_donor_(value_cache, True)
- key = key.flatten(0, 2)
- value = value.flatten(0, 2)
- key_cache = key_cache.flatten(0, 2)
- value_cache = value_cache.flatten(0, 2)
- key_cache.index_copy_(0, slot_mapping, key)
- value_cache.index_copy_(0, slot_mapping, value)
|