schema.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """
  2. This file contains the Pydantic schemas for various quantization-related
  3. parameters. When a relevant quantization technique is specified, these
  4. parameters are loaded in the form of a JSON alongside the model weights
  5. and augment the model with additional information needed for use of that
  6. technique. The format of this JSON should be specified by one or more
  7. schemas contained here.
  8. For example, when the KV cache is quantized to FP8-E4M3 (currently only
  9. possible on ROCm), the model can be optionally augmented with KV cache
  10. scaling factors.
  11. """
  12. from typing import Dict, Optional
  13. from pydantic import BaseModel, ConfigDict, ValidationInfo, model_validator
  14. class KVCacheQuantSchema(BaseModel):
  15. dtype: str
  16. # Each key is a TP rank. Each value is a dictionary mapping a TP rank's
  17. # layer indices to their per-tensor KV cache scaling factor.
  18. # TODO: Consider pulling this and its validation methods out into its
  19. # own schema class (tricky as its members are variable)
  20. scaling_factor: Dict[int, Dict[int, float]]
  21. @model_validator(mode="after")
  22. def check_is_fp8(self) -> "KVCacheQuantSchema":
  23. assert self.dtype == "float8_e4m3fn", (
  24. "Loaded scaling factors intended for KV cache dtype = "
  25. f"{self.dtype} rather than float8_e4m3fn!")
  26. return self
  27. @model_validator(mode="after")
  28. def check_tp_ranks(self, info: ValidationInfo) -> "KVCacheQuantSchema":
  29. context = info.context
  30. if context:
  31. tp_size = context["tp_size"]
  32. num_hidden_layers = context["num_hidden_layers"]
  33. assert len(self.scaling_factor) == tp_size, (
  34. f"Loaded dictionary has TP size {len(self.scaling_factor)} "
  35. f"but LLM engine is currently running with TP size {tp_size}.")
  36. for tp_rank, layer_maps in self.scaling_factor.items():
  37. assert len(layer_maps) == num_hidden_layers, (
  38. f"KV cache scales map for TP rank {tp_rank} is malformed. "
  39. f"Expected {num_hidden_layers} layers, got "
  40. f"{len(layer_maps)}.")
  41. for i in range(tp_size):
  42. assert (i in self.scaling_factor
  43. ), f"KV cache scales map for TP rank {i} not found."
  44. return self
  45. @model_validator(mode="after")
  46. def check_current_rank(self, info: ValidationInfo) -> "KVCacheQuantSchema":
  47. context = info.context
  48. if context:
  49. tp_rank = context["tp_rank"]
  50. num_hidden_layers = context["num_hidden_layers"]
  51. layer_scales_map = self.scaling_factor[tp_rank]
  52. for i in range(num_hidden_layers):
  53. assert i in layer_scales_map, (
  54. f"Could not find KV cache scales for layer {i} in "
  55. f"TP rank {tp_rank}.")
  56. return self
  57. class QuantParamSchema(BaseModel):
  58. # TODO: Generalize and extend with more fields
  59. # (e.g. weights/activations params) once functionality is enabled
  60. model_config = ConfigDict(protected_namespaces=())
  61. model_type: Optional[str]
  62. kv_cache: KVCacheQuantSchema
  63. @model_validator(mode="after")
  64. def check_model_type(self, info: ValidationInfo) -> "QuantParamSchema":
  65. context = info.context
  66. if context:
  67. model_type = context.get("model_type", None)
  68. if model_type is not None:
  69. assert model_type == self.model_type, (
  70. f"Model type is {model_type} but loaded "
  71. f"scaling factors belonging to different "
  72. f"model type {self.model_type}!")
  73. return self