123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- """ Attention layer with torch scaled_dot_product_attention
- and PagedAttention."""
- from dataclasses import dataclass
- from typing import Any, Dict, List, Optional, Tuple, Type
- import torch
- from aphrodite._ipex_ops import ipex_ops
- from aphrodite.attention.backends.abstract import (AttentionBackend,
- AttentionImpl,
- AttentionMetadata,
- AttentionType)
- from aphrodite.attention.backends.utils import (CommonAttentionState,
- CommonMetadataBuilder)
- from aphrodite.attention.ops.paged_attn import (PagedAttention,
- PagedAttentionMetadata)
- _PARTITION_SIZE = 512
- class IpexAttnBackend(AttentionBackend):
- @staticmethod
- def get_name() -> str:
- return "ipex-attn"
- @staticmethod
- def get_impl_cls() -> Type["IpexAttnBackendImpl"]:
- return IpexAttnBackendImpl
- @staticmethod
- def get_metadata_cls() -> Type["IpexAttnMetadata"]:
- return IpexAttnMetadata
- @staticmethod
- def get_builder_cls() -> Type["IpexAttnMetadataBuilder"]:
- return IpexAttnMetadataBuilder
- @staticmethod
- def get_state_cls() -> Type["CommonAttentionState"]:
- return CommonAttentionState
- @staticmethod
- def get_kv_cache_shape(
- num_blocks: int,
- block_size: int,
- num_kv_heads: int,
- head_size: int,
- ) -> Tuple[int, ...]:
- return PagedAttention.get_kv_cache_shape(num_blocks, block_size,
- num_kv_heads, head_size)
- @staticmethod
- def swap_blocks(
- src_kv_cache: torch.Tensor,
- dst_kv_cache: torch.Tensor,
- src_to_dst: torch.Tensor,
- ) -> None:
- from aphrodite._ipex_ops import ipex_ops as ops
- ops.swap_blocks(src_kv_cache, dst_kv_cache, src_to_dst)
- @staticmethod
- def copy_blocks(
- kv_caches: List[torch.Tensor],
- src_to_dists: torch.Tensor,
- ) -> None:
- from aphrodite._ipex_ops import ipex_ops as ops
- key_caches = [kv_cache[0] for kv_cache in kv_caches]
- value_caches = [kv_cache[1] for kv_cache in kv_caches]
- ops.copy_blocks(key_caches, value_caches, src_to_dists)
- @dataclass
- class IpexAttnMetadata(AttentionMetadata, PagedAttentionMetadata):
- """Metadata for IpexAttnBackend.
- """
- # Currently, input sequences can only contain all prompts
- # or all decoding. True if all sequences are prompts.
- is_prompt: bool
- slot_mapping: torch.Tensor
- seq_lens: Optional[List[int]]
- seqlen_q: Optional[torch.Tensor]
- max_seqlen: Optional[int]
- def __post_init__(self):
- # Set during the execution of the first attention op.
- # It is a list because it is needed to set per prompt
- # when alibi slopes is used. It is because of the limitation
- # from xformer API.
- # will not appear in the __repr__ and __init__
- self.attn_bias: Optional[List[torch.Tensor]] = None
- @property
- def prefill_metadata(self) -> Optional["IpexAttnMetadata"]:
- # Currently chunked prefill is not supported
- if self.num_decode_tokens == 0:
- assert self.num_prefills > 0
- return self
- return None
- @property
- def decode_metadata(self) -> Optional["IpexAttnMetadata"]:
- # Currently chunked prefill is not supported
- if self.num_prefills > 0:
- assert self.num_decode_tokens == 0
- return None
- return self
- class IpexAttnMetadataBuilder(CommonMetadataBuilder[IpexAttnMetadata]):
- _metadata_cls = IpexAttnMetadata
- class IpexAttnBackendImpl(AttentionImpl[IpexAttnMetadata]):
- def __init__(
- self,
- num_heads: int,
- head_size: int,
- scale: float,
- num_kv_heads: int,
- alibi_slopes: Optional[List[float]],
- sliding_window: Optional[int],
- kv_cache_dtype: str,
- blocksparse_params: Optional[Dict[str, Any]] = None,
- logits_soft_cap: Optional[float] = None,
- ) -> None:
- if blocksparse_params is not None:
- raise ValueError(
- "IPEX backend does not support block-sparse attention.")
- if logits_soft_cap is not None:
- raise ValueError("IPEX backend does not support logits_soft_cap.")
- self.num_heads = num_heads
- self.head_size = head_size
- self.scale = float(scale)
- self.num_kv_heads = num_kv_heads
- if alibi_slopes is not None:
- alibi_slopes = torch.tensor(alibi_slopes, dtype=torch.float32)
- self.alibi_slopes = alibi_slopes
- self.sliding_window = sliding_window
- self.kv_cache_dtype = kv_cache_dtype
- assert self.num_heads % self.num_kv_heads == 0
- self.num_queries_per_kv = self.num_heads // self.num_kv_heads
- self.need_mask = (self.alibi_slopes is not None
- or self.sliding_window is not None)
- supported_head_sizes = PagedAttention.get_supported_head_sizes()
- if head_size not in supported_head_sizes:
- raise ValueError(
- f"Head size {head_size} is not supported by PagedAttention. "
- f"Supported head sizes are: {supported_head_sizes}.")
- if kv_cache_dtype != "auto":
- raise NotImplementedError(
- "IPEX backend does not support FP8 KV cache. "
- "Please use xFormers backend instead.")
- def split_kv_cache(
- self,
- kv_cache: torch.Tensor,
- num_kv_heads: int,
- head_size: int,
- ) -> Tuple[torch.Tensor, torch.Tensor]:
- x = 1
- num_blocks = kv_cache.shape[1]
- key_cache = kv_cache[0]
- key_cache = key_cache.view(num_blocks, num_kv_heads, head_size // x,
- -1, x)
- value_cache = kv_cache[1]
- value_cache = value_cache.view(num_blocks, num_kv_heads, head_size, -1)
- return key_cache, value_cache
- def forward(
- self,
- query: torch.Tensor,
- key: torch.Tensor,
- value: torch.Tensor,
- kv_cache: Optional[torch.Tensor],
- attn_metadata: IpexAttnMetadata, # type: ignore
- k_scale: float = 1.0,
- v_scale: float = 1.0,
- attn_type: AttentionType = AttentionType.DECODER,
- ) -> torch.Tensor:
- """Forward pass with IPEX varlen_attention and PagedAttention.
- Args:
- query: shape = [num_tokens, num_heads * head_size]
- key: shape = [num_tokens, num_kv_heads * head_size]
- value: shape = [num_tokens, num_kv_heads * head_size]
- kv_cache = [2, num_blocks, block_size * num_kv_heads * head_size]
- attn_metadata: Metadata for attention.
- Returns:
- shape = [num_tokens, num_heads * head_size]
- """
- assert k_scale == 1.0 and v_scale == 1.0
- if attn_type != AttentionType.DECODER:
- raise NotImplementedError("Encoder self-attention and "
- "encoder/decoder cross-attention "
- "are not implemented for "
- "IpexAttnBackendImpl")
- num_tokens, hidden_size = query.shape
- # Reshape the query, key, and value tensors.
- query = query.view(-1, self.num_heads, self.head_size)
- key = key.view(-1, self.num_kv_heads, self.head_size)
- value = value.view(-1, self.num_kv_heads, self.head_size)
- if kv_cache is not None:
- key_cache, value_cache = self.split_kv_cache(
- kv_cache, self.num_kv_heads, self.head_size)
- ipex_ops.reshape_and_cache(
- key,
- value,
- key_cache,
- value_cache,
- attn_metadata.slot_mapping.flatten(),
- self.kv_cache_dtype,
- k_scale,
- v_scale,
- )
- if attn_metadata.is_prompt:
- assert attn_metadata.seq_lens is not None
- if (kv_cache is None or attn_metadata.block_tables.numel() == 0):
- if self.num_kv_heads != self.num_heads:
- key = key.repeat_interleave(self.num_queries_per_kv, dim=1)
- value = value.repeat_interleave(self.num_queries_per_kv,
- dim=1)
- if attn_metadata.attn_bias is None:
- if self.alibi_slopes is not None:
- att_masks = _make_alibi_bias(
- self.alibi_slopes, query.dtype,
- attn_metadata.seq_lens) # type: ignore
- elif self.sliding_window is not None:
- att_masks = _make_sliding_window_bias(
- attn_metadata.seq_lens, self.sliding_window,
- query.dtype) # type: ignore
- else:
- att_masks = _make_sliding_window_bias(
- attn_metadata.seq_lens, None, dtype=query.dtype)
- attn_metadata.attn_bias = att_masks
- output = torch.empty(
- (num_tokens, self.num_heads, self.head_size),
- dtype=query.dtype,
- device=query.device)
- ipex_ops.varlen_attention(query,
- key,
- value,
- output,
- attn_metadata.seqlen_q,
- attn_metadata.seqlen_q,
- attn_metadata.max_seqlen,
- attn_metadata.max_seqlen,
- pdropout=0.0,
- softmax_scale=self.scale,
- zero_tensors=False,
- is_causal=True,
- return_softmax=False,
- gen_=None)
- else:
- # prefix-enabled attention
- raise RuntimeError(
- "IPEX backend doesn't support prefix decoding.")
- else:
- # Decoding run.
- max_seq_len = attn_metadata.max_decode_seq_len
- output = torch.empty_like(query)
- block_size = value_cache.shape[3]
- num_seqs, num_heads, head_size = query.shape
- max_num_partitions = ((max_seq_len + _PARTITION_SIZE - 1) //
- _PARTITION_SIZE)
- # NOTE: We use a simple heuristic to decide whether to use
- # PagedAttention V1 or V2. If the number of partitions is 1, we use
- # V1 to avoid the overhead of reduction. Also, if the number of
- # sequences or heads is large, we use V1 since there is enough work
- # to parallelize.
- # TODO: Tune this heuristic.
- # For context len > 8192, use V2 kernel to avoid shared memory
- # shortage.
- use_v1 = (max_seq_len <= 8192 and
- (max_num_partitions == 1 or num_seqs * num_heads > 512))
- if use_v1:
- # Run PagedAttention V1.
- ipex_ops.paged_attention_v1(
- output,
- query,
- key_cache,
- value_cache,
- self.num_kv_heads,
- self.scale,
- attn_metadata.block_tables,
- attn_metadata.seq_lens_tensor,
- block_size,
- max_seq_len,
- self.alibi_slopes,
- self.kv_cache_dtype,
- k_scale,
- v_scale,
- )
- else:
- # Run PagedAttention V2.
- assert _PARTITION_SIZE % block_size == 0
- tmp_output = torch.empty(
- size=(num_seqs, num_heads, max_num_partitions, head_size),
- dtype=output.dtype,
- device=output.device,
- )
- exp_sums = torch.empty(
- size=(num_seqs, num_heads, max_num_partitions),
- dtype=torch.float32,
- device=output.device,
- )
- max_logits = torch.empty_like(exp_sums)
- ipex_ops.paged_attention_v2(
- output,
- exp_sums,
- max_logits,
- tmp_output,
- query,
- key_cache,
- value_cache,
- self.num_kv_heads,
- self.scale,
- attn_metadata.block_tables,
- attn_metadata.seq_lens_tensor,
- block_size,
- max_seq_len,
- self.alibi_slopes,
- self.kv_cache_dtype,
- k_scale,
- v_scale,
- )
- # Reshape the output tensor.
- return output.view(-1, self.num_heads * self.head_size)
- def _make_alibi_bias(
- alibi_slopes: torch.Tensor,
- dtype: torch.dtype,
- seq_lens: List[int],
- ) -> List[torch.Tensor]:
- attn_biases = []
- for seq_len in seq_lens:
- bias = torch.arange(seq_len, dtype=dtype, device=alibi_slopes.device)
- # NOTE: HF uses
- # `bias = bias[None, :].repeat(seq_len, 1)`
- # here. We find that both biases give the same results, but
- # the bias below more accurately follows the original ALiBi
- # paper.
- bias = bias[None, :] - bias[:, None]
- num_heads = alibi_slopes.shape[0]
- bias = bias[None, :].repeat((num_heads, 1, 1))
- bias.mul_(alibi_slopes[:, None, None])
- inf_mask = torch.empty(
- (1, seq_len, seq_len),
- dtype=bias.dtype,
- device=alibi_slopes.device).fill_(-torch.inf).triu_(diagonal=1)
- attn_biases.append((bias + inf_mask).to(dtype))
- return attn_biases
- def _make_sliding_window_bias(
- seq_lens: List[int],
- window_size: Optional[int],
- dtype: torch.dtype,
- ) -> List[torch.Tensor]:
- attn_biases = []
- for seq_len in seq_lens:
- tensor = torch.full(
- (1, seq_len, seq_len),
- dtype=dtype,
- fill_value=1,
- )
- shift = 0
- mask = torch.tril(tensor, diagonal=shift).to(dtype) # type: ignore
- if window_size is not None:
- mask = torch.triu(mask, diagonal=shift - window_size + 1)
- mask = torch.log(mask)
- attn_biases.append(mask.to(dtype))
- return attn_biases
|