123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- from typing import Any, Dict, List, Optional, Tuple, Union
- import torch
- from loguru import logger
- from torch.nn import Module
- from torch.nn.parameter import Parameter
- from aphrodite import _custom_ops as ops
- from aphrodite.modeling.layers.linear import LinearBase, LinearMethodBase
- from aphrodite.quantization.base_config import (QuantizationConfig,
- QuantizeMethodBase)
- from aphrodite.modeling.utils import set_weight_attrs
- from aphrodite.common.utils import print_warning_once
- ACTIVATION_SCHEMES = ["static", "dynamic"]
- def cutlass_fp8_supported() -> bool:
- capability = torch.cuda.get_device_capability()
- capability = capability[0] * 10 + capability[1]
- return ops.cutlass_scaled_mm_supports_fp8(capability)
- class Fp8Config(QuantizationConfig):
- """Config class for FP8."""
- def __init__(
- self,
- is_checkpoint_fp8_serialized: bool = False,
- activation_scheme: str = "dynamic",
- ) -> None:
- self.is_checkpoint_fp8_serialized = is_checkpoint_fp8_serialized
- if is_checkpoint_fp8_serialized:
- logger.warning("Detected fp8 checkpoint. Please note that the "
- "format is experimental and subject to change.")
- if activation_scheme not in ACTIVATION_SCHEMES:
- raise ValueError(
- f"Unsupported activation scheme {activation_scheme}")
- self.activation_scheme = activation_scheme
- @classmethod
- def get_name(cls) -> str:
- return "fp8"
- @classmethod
- def get_supported_act_dtypes(cls) -> List[torch.dtype]:
- return [torch.bfloat16, torch.half]
- @classmethod
- def get_min_capability(cls) -> int:
- return 89
- @classmethod
- def get_config_filenames(cls) -> List[str]:
- return []
- @classmethod
- def from_config(cls, config: Dict[str, Any]) -> "Fp8Config":
- quant_method = cls.get_from_keys(config, ["quant_method"])
- is_checkpoint_fp8_serialized = ("fp8" in quant_method)
- activation_scheme = cls.get_from_keys(config, ["activation_scheme"])
- return cls(is_checkpoint_fp8_serialized=is_checkpoint_fp8_serialized,
- activation_scheme=activation_scheme)
- def get_quant_method(
- self, layer: torch.nn.Module) -> Optional["QuantizeMethodBase"]:
- from aphrodite.attention.layer import Attention # Avoid circular import
- if isinstance(layer, LinearBase):
- return Fp8LinearMethod(self)
- if isinstance(layer, Attention):
- return Fp8KVCacheMethod(self)
- return None
- def get_scaled_act_names(self) -> List[str]:
- return []
- class Fp8LinearMethod(LinearMethodBase):
- """Linear method for FP8.
- Supports loading FP8 checkpoints with static weight scale and
- dynamic/static activation scale.
- Also supports loading quantized FP16/BF16 model checkpoints with dynamic
- activation scaling. The weight scaling factor will be initialized after
- the model weights are loaded.
- Limitations:
- 1. Only support per-tensor quantization due to torch._scaled_mm support.
- 2. Only support float8_e4m3fn data type due to the limitation of
- torch._scaled_mm (https://github.com/pytorch/pytorch/blob/2e48b39603411a41c5025efbe52f89560b827825/aten/src/ATen/native/cuda/Blas.cpp#L854-L856)
- Args:
- quant_config: The quantization config.
- """
- def __init__(self, quant_config: Fp8Config):
- self.quant_config = quant_config
- self.cutlass_fp8_supported = cutlass_fp8_supported()
- def _create_scale_param(
- self,
- scale_name: str,
- layer: torch.nn.Module,
- output_partition_sizes: List[int],
- **extra_weight_attrs,
- ) -> None:
- scale = Parameter(torch.empty(len(output_partition_sizes),
- dtype=torch.float32),
- requires_grad=False)
- layer.register_parameter(scale_name, scale)
- set_weight_attrs(
- scale, {
- **extra_weight_attrs,
- "fp8_scales_shard_indexer":
- self.scales_shard_indexer,
- })
- def create_weights(
- self,
- layer: torch.nn.Module,
- input_size_per_partition: int,
- output_partition_sizes: List[int],
- input_size: int,
- output_size: int,
- params_dtype: torch.dtype,
- **extra_weight_attrs,
- ):
- del input_size, output_size
- output_size_per_partition = sum(output_partition_sizes)
- layer.process_after_load = True
- layer.logical_widths = output_partition_sizes
- # WEIGHT
- weight_dtype = (torch.float8_e4m3fn
- if self.quant_config.is_checkpoint_fp8_serialized else
- params_dtype)
- weight = Parameter(torch.empty(output_size_per_partition,
- input_size_per_partition,
- dtype=weight_dtype),
- requires_grad=False)
- layer.register_parameter("weight", weight)
- set_weight_attrs(weight, {
- **extra_weight_attrs,
- "input_dim": 1,
- "output_dim": 0,
- })
- # If checkpoint is serialized fp8, load them.
- # Otherwise, wait until process_weights_after_loading.
- if self.quant_config.is_checkpoint_fp8_serialized:
- # WEIGHT SCALE
- self._create_scale_param(
- scale_name="weight_scale",
- layer=layer,
- output_partition_sizes=output_partition_sizes,
- **extra_weight_attrs)
- # INPUT ACTIVATION SCALE
- if self.quant_config.activation_scheme == "static":
- self._create_scale_param(
- scale_name="input_scale",
- layer=layer,
- output_partition_sizes=output_partition_sizes,
- **extra_weight_attrs)
- def scales_shard_indexer(
- self, param: torch.Tensor, loaded_weight: torch.Tensor,
- shard_id: Union[str, int]) -> Tuple[torch.Tensor, torch.Tensor]:
- qkv_idxs = {"q": 0, "k": 1, "v": 2}
- if isinstance(shard_id, int):
- pass
- elif isinstance(shard_id, str):
- if shard_id not in qkv_idxs:
- raise ValueError(f"Unknown shard_id: {shard_id}")
- shard_id = qkv_idxs[shard_id]
- else:
- ValueError(f"Shard id must be int or str but got {type(shard_id)}")
- return param[shard_id], loaded_weight
- def process_weights_after_loading(self, layer: Module) -> None:
- if (not hasattr(layer, "process_after_load")
- or not layer.process_after_load):
- return
- # If checkpoint is fp/bf16 (not serialized fp8), quantize the weights.
- if not self.quant_config.is_checkpoint_fp8_serialized:
- qweight, weight_scale = ops.scaled_fp8_quant(layer.weight,
- scale=None)
- layer.weight = Parameter(qweight.t(), requires_grad=False)
- layer.weight_scale = Parameter(weight_scale, requires_grad=False)
- layer.logical_widths = None
- layer.input_scale = None
- return
- # If checkpoint is fp8, requantize the separately quantized logical
- # weights into a single fp8 weight with a single weight scale.
- else:
- # WEIGHT_SCALE / WEIGHT
- # Loop over logical weights, requantizing with single scale.
- max_w_scale = layer.weight_scale.max()
- start = 0
- for idx, logical_width in enumerate(layer.logical_widths):
- end = start + logical_width
- weight_dq = per_tensor_dequantize(layer.weight[start:end, :],
- layer.weight_scale[idx])
- layer.weight[start:end, :] = per_tensor_quantize(
- weight_dq, layer.weight_scale.max())
- start = end
- layer.weight_scale = Parameter(max_w_scale, requires_grad=False)
- # WEIGHT
- # Transpose weight for passing to torch._scaled_mm
- weight = layer.weight
- layer.weight = Parameter(weight.t(), requires_grad=False)
- # INPUT ACTIVATION SCALE
- # Dynamic: set to None (required input to ops.scaled_fp8_quant).
- # Static: set to max of the input_scales (since they are equal).
- if self.quant_config.activation_scheme == "dynamic":
- layer.input_scale = None
- elif self.quant_config.activation_scheme == "static":
- if not all_close_1d(layer.input_scale):
- raise ValueError(
- "All the input_scales for the logical weights of a "
- f"layer must be equal. But got {layer.input_scale}")
- layer.input_scale = Parameter(layer.input_scale.max(),
- requires_grad=False)
- else:
- raise ValueError(
- f"Unknown scheme {self.quant_config.activation_scheme}")
- def apply(self,
- layer: torch.nn.Module,
- x: torch.Tensor,
- bias: Optional[torch.Tensor] = None) -> torch.Tensor:
- # ops.scaled_fp8_quant supports both dynamic and static quant.
- # If dynamic, layer.input_scale is None and x_scale computed from x.
- # If static, layer.input_scale is scalar and x_scale is input_scale.
- if bias is None and self.cutlass_fp8_supported:
- qinput, x_scale = ops.scaled_fp8_quant(x, layer.input_scale)
- # Fused GEMM_DQ
- output = ops.cutlass_scaled_mm(
- qinput,
- layer.weight,
- out_dtype=x.dtype,
- scale_a=x_scale,
- scale_b=layer.weight_scale,
- )
- else:
- qinput, x_scale = ops.scaled_fp8_quant(x,
- layer.input_scale,
- batch_dim_padding=17)
- # Fused GEMM_DQ -- note we padded the input above because
- # torch._scaled_mm is more performant for matrices with
- # batch dimension > 16. Note that this could change
- # in the future.
- output, _ = torch._scaled_mm(
- qinput,
- layer.weight,
- out_dtype=x.dtype,
- scale_a=x_scale,
- scale_b=layer.weight_scale,
- bias=bias,
- )
- return torch.narrow(output, 0, 0, x.shape[0])
- class Fp8KVCacheMethod(QuantizeMethodBase):
- """Supports loading kv-cache scaling factors from FP8 checkpoints.
- """
- def __init__(self, quant_config: Fp8Config):
- self.quant_config = quant_config
- def create_weights(self, layer: torch.nn.Module):
- """Create "weight" (aka kv_scale) for an attention layer.
- Args:
- layer: The layer that is using the QuantizeMethodBase factory.
- """
- # Initialize the KV cache scale to 1.0 as the default value.
- # If the kv_scale appears in the checkpoint, it will be
- # overwritten when loading weights.
- layer.kv_scale = Parameter(torch.tensor(1.0), requires_grad=False)
- def apply(self, layer: torch.nn.Module) -> torch.Tensor:
- raise RuntimeError("Fp8KVCacheMethod.apply should not be called.")
- def process_weights_after_loading(self, layer: Module) -> None:
- # If the kv-cache dtype is auto, we enforce the kv-scale to be 1.0
- # regardless whether the kv-scale is available in the checkpoint.
- if layer.kv_cache_dtype != "auto":
- kv_scale = layer.kv_scale.to("cpu").tolist()
- if not isinstance(kv_scale, float):
- raise ValueError("Only support per-tensor scaling factor "
- "for fp8 KV cache")
- layer._kv_scale = kv_scale
- if layer._kv_scale == 1.0 and "e5m2" not in layer.kv_cache_dtype:
- print_warning_once(
- "Using KV cache scaling factor 1.0 for fp8_e4m3. This may "
- "cause accuracy issues. Please make sure kv-cache scaling "
- "factor is available in the fp8 checkpoint.")
- del layer.kv_scale
- def all_close_1d(x: torch.Tensor) -> bool:
- assert len(x.shape) == 1
- return all(torch.allclose(x[0], x[i]) for i in range(x.shape[0]))
- def per_tensor_quantize(tensor: torch.Tensor,
- inv_scale: Union[float, torch.Tensor]) -> torch.Tensor:
- finfo = torch.finfo(torch.float8_e4m3fn)
- qweight = (tensor / inv_scale).clamp(min=finfo.min, max=finfo.max)
- return qweight.to(torch.float8_e4m3fn)
- def per_tensor_dequantize(
- tensor: torch.Tensor, inv_scale: Union[float,
- torch.Tensor]) -> torch.Tensor:
- fake_qweight = tensor.to(torch.float16)
- dq_weight = fake_qweight * inv_scale
- return dq_weight
|