123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472 |
- import argparse
- import dataclasses
- import io
- import os
- import re
- import time
- from dataclasses import dataclass
- from functools import partial
- from typing import BinaryIO, Generator, Optional, Tuple, Type, Union
- import torch
- from loguru import logger
- from torch import nn
- from transformers import PretrainedConfig
- import aphrodite.common.envs as envs
- from aphrodite.common.config import ModelConfig, ParallelConfig
- from aphrodite.engine.aphrodite_engine import AphroditeEngine
- from aphrodite.engine.args_tools import EngineArgs
- from aphrodite.modeling.layers.vocab_parallel_embedding import (
- VocabParallelEmbedding)
- from aphrodite.quantization.base_config import QuantizationConfig
- tensorizer_error_msg = None
- try:
- from tensorizer import (DecryptionParams, EncryptionParams,
- TensorDeserializer, TensorSerializer)
- from tensorizer.stream_io import open_stream
- from tensorizer.utils import (convert_bytes, get_mem_usage,
- no_init_or_tensor)
- _read_stream, _write_stream = (partial(
- open_stream,
- mode=mode,
- ) for mode in ("rb", "wb+"))
- except ImportError as e:
- tensorizer_error_msg = e
- __all__ = [
- 'EncryptionParams', 'DecryptionParams', 'TensorDeserializer',
- 'TensorSerializer', 'open_stream', 'convert_bytes', 'get_mem_usage',
- 'no_init_or_tensor', 'TensorizerConfig'
- ]
- @dataclass
- class TensorizerConfig:
- tensorizer_uri: str
- aphrodite_tensorized: Optional[bool] = False
- verify_hash: Optional[bool] = False
- num_readers: Optional[int] = None
- encryption_keyfile: Optional[str] = None
- s3_access_key_id: Optional[str] = None
- s3_secret_access_key: Optional[str] = None
- s3_endpoint: Optional[str] = None
- model_class: Optional[Type[torch.nn.Module]] = None
- hf_config: Optional[PretrainedConfig] = None
- dtype: Optional[Union[str, torch.dtype]] = None
- _is_sharded: bool = False
- def __post_init__(self):
- # check if the configuration is for a sharded Aphrodite model
- self._is_sharded = isinstance(self.tensorizer_uri, str) \
- and re.search(r'%0\dd', self.tensorizer_uri) is not None
- def _construct_tensorizer_args(self) -> "TensorizerArgs":
- tensorizer_args = {
- "tensorizer_uri": self.tensorizer_uri,
- "aphrodite_tensorized": self.aphrodite_tensorized,
- "verify_hash": self.verify_hash,
- "num_readers": self.num_readers,
- "encryption_keyfile": self.encryption_keyfile,
- "s3_access_key_id": self.s3_access_key_id,
- "s3_secret_access_key": self.s3_secret_access_key,
- "s3_endpoint": self.s3_endpoint,
- }
- return TensorizerArgs(**tensorizer_args)
- def verify_with_parallel_config(
- self,
- parallel_config: "ParallelConfig",
- ) -> None:
- if parallel_config.tensor_parallel_size > 1 \
- and not self._is_sharded:
- raise ValueError(
- "For a sharded model, tensorizer_uri should include a"
- " string format template like '%04d' to be formatted"
- " with the rank of the shard")
- def verify_with_model_config(self, model_config: "ModelConfig") -> None:
- if (model_config.quantization is not None
- and self.tensorizer_uri is not None):
- logger.warning(
- "Loading a model using Tensorizer with quantization on "
- "aphrodite is unstable and may lead to errors.")
- def load_with_tensorizer(tensorizer_config: TensorizerConfig,
- **extra_kwargs) -> nn.Module:
- tensorizer = TensorizerAgent(tensorizer_config, **extra_kwargs)
- return tensorizer.deserialize()
- @dataclass
- class TensorizerArgs:
- tensorizer_uri: Union[io.BufferedIOBase, io.RawIOBase, BinaryIO, str,
- bytes, os.PathLike, int]
- aphrodite_tensorized: Optional[bool] = False
- verify_hash: Optional[bool] = False
- num_readers: Optional[int] = None
- encryption_keyfile: Optional[str] = None
- s3_access_key_id: Optional[str] = None
- s3_secret_access_key: Optional[str] = None
- s3_endpoint: Optional[str] = None
- """
- Args for the TensorizerAgent class. These are used to configure the behavior
- of the TensorDeserializer when loading tensors from a serialized model.
-
- Args:
- tensorizer_uri: Path to serialized model tensors. Can be a local file
- path or a S3 URI.
- aphrodite_tensorized: If True, indicates that the serialized model is a
- aphrodite model. This is used to determine the behavior of the
- TensorDeserializer when loading tensors from a serialized model.
- It is far faster to deserialize a aphrodite model as it utilizes
- ttensorizer's optimized GPU loading. Note that this is now
- deprecated, as serialized Aphrodite models are now automatically
- inferred as Aphrodite models.
- verify_hash: If True, the hashes of each tensor will be verified against
- the hashes stored in the metadata. A `HashMismatchError` will be
- raised if any of the hashes do not match.
- num_readers: Controls how many threads are allowed to read concurrently
- from the source file. Default is `None`, which will dynamically set
- the number of readers based on the number of available
- resources and model size. This greatly increases performance.
- encryption_keyfile: File path to a binary file containing a
- binary key to use for decryption. `None` (the default) means
- no decryption. See the example script in
- examples/tensorize_aphrodite_model.py.
- s3_access_key_id: The access key for the S3 bucket. Can also be set via
- the S3_ACCESS_KEY_ID environment variable.
- s3_secret_access_key: The secret access key for the S3 bucket. Can also
- be set via the S3_SECRET_ACCESS_KEY environment variable.
- s3_endpoint: The endpoint for the S3 bucket. Can also be set via the
- S3_ENDPOINT_URL environment variable.
- """
- def __post_init__(self):
- self.file_obj = self.tensorizer_uri
- self.s3_access_key_id = (self.s3_access_key_id
- or envs.S3_ACCESS_KEY_ID) or None
- self.s3_secret_access_key = (
- self.s3_secret_access_key
- or envs.S3_SECRET_ACCESS_KEY) or None
- self.s3_endpoint = (self.s3_endpoint
- or envs.S3_ENDPOINT_URL) or None
- self.stream_params = {
- "s3_access_key_id": self.s3_access_key_id,
- "s3_secret_access_key": self.s3_secret_access_key,
- "s3_endpoint": self.s3_endpoint,
- }
- self.deserializer_params = {
- "verify_hash": self.verify_hash,
- "encryption": self.encryption_keyfile,
- "num_readers": self.num_readers
- }
- if self.encryption_keyfile:
- with open_stream(
- self.encryption_keyfile,
- **self.stream_params,
- ) as stream:
- key = stream.read()
- decryption_params = DecryptionParams.from_key(key)
- self.deserializer_params['encryption'] = decryption_params
- @staticmethod
- def add_cli_args(
- parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
- """Tensorizer CLI arguments"""
- # Tensorizer options arg group
- group = parser.add_argument_group(
- 'tensorizer options',
- description=('Options for configuring the behavior of the'
- ' tensorizer deserializer when '
- 'load_format=tensorizer is specified when '
- 'initializing an AphroditeEngine, either via the CLI '
- 'when running the Aphrodite OpenAI inference server '
- 'with a JSON string passed to '
- '--model-loader-extra-config or as arguments given '
- 'to TensorizerConfig when passed to '
- 'model_loader_extra_config in the constructor '
- 'for AphroditeEngine.'))
- group.add_argument(
- "--tensorizer-uri",
- help="Path to serialized model tensors. Can be a local file path,"
- " or an HTTP(S) or S3 URI.",
- )
- group.add_argument(
- "--verify-hash",
- action="store_true",
- help="If enabled, the hashes of each tensor will be verified"
- " against the hashes stored in the file metadata. An exception"
- " will be raised if any of the hashes do not match.",
- )
- group.add_argument(
- "--encryption-keyfile",
- default=None,
- help="The file path to a binary file containing a binary key to "
- "use for decryption. Can be a file path or S3 network URI.")
- group.add_argument(
- "--num-readers",
- default=None,
- type=int,
- help="Controls how many threads are allowed to read concurrently "
- "from the source file. Default is `None`, which will dynamically "
- "set the number of readers based on the available resources "
- "and model size. This greatly increases performance.")
- group.add_argument(
- "--s3-access-key-id",
- default=None,
- help="The access key for the S3 bucket. Can also be set via the "
- "S3_ACCESS_KEY_ID environment variable.",
- )
- group.add_argument(
- "--s3-secret-access-key",
- default=None,
- help="The secret access key for the S3 bucket. Can also be set via "
- "the S3_SECRET_ACCESS_KEY environment variable.",
- )
- group.add_argument(
- "--s3-endpoint",
- default=None,
- help="The endpoint for the S3 bucket. Can also be set via the "
- "S3_ENDPOINT_URL environment variable.",
- )
- return parser
- @classmethod
- def from_cli_args(cls, args: argparse.Namespace) -> "TensorizerArgs":
- attrs = [attr.name for attr in dataclasses.fields(cls)]
- tensorizer_args = cls(**{
- attr: getattr(args, attr)
- for attr in attrs if hasattr(args, attr)
- })
- return tensorizer_args
- class TensorizerAgent:
- """
- A class for performing tensorizer deserializations specifically for
- aphrodite models using plaid_mode. Uses TensorizerArgs to configure the
- behavior of the TensorDeserializer when loading tensors from a serialized
- model. For deserializations of HuggingFace models, TensorDeserializer is
- instead used as an iterator directly in the func hf_model_weights_iterator
- in aphrodite/modeling/model_loader/weight_utils.py
- """
- def __init__(self, tensorizer_config: TensorizerConfig,
- quant_config: QuantizationConfig, **extra_kwargs):
- if tensorizer_error_msg is not None:
- raise ImportError(
- "Tensorizer is not installed. Please install tensorizer "
- "to use this feature with "
- "`pip install aphrodite-engine[tensorizer]`. "
- "Error message: {}".format(tensorizer_error_msg))
- self.tensorizer_config = tensorizer_config
- self.tensorizer_args = (
- self.tensorizer_config._construct_tensorizer_args())
- self.extra_kwargs = extra_kwargs
- if extra_kwargs.get("quant_config", None) is not None:
- self.quant_config = extra_kwargs["quant_config"]
- else:
- self.quant_config = quant_config
- self.model = self._init_model()
- def _init_model(self):
- model_args = self.tensorizer_config.hf_config
- model_args.torch_dtype = self.tensorizer_config.dtype
- with no_init_or_tensor():
- return self.tensorizer_config.model_class(
- config=model_args,
- quant_config=self.quant_config,
- **self.extra_kwargs)
- def _resize_lora_embeddings(self):
- """Modify LoRA embedding layers to use bigger tensors
- to allow for adapter added tokens."""
- for child in self.model.modules():
- if (isinstance(child, VocabParallelEmbedding)
- and child.weight.shape[0] <
- child.num_embeddings_per_partition):
- new_weight = torch.empty(child.num_embeddings_per_partition,
- child.embedding_dim,
- dtype=child.weight.dtype,
- device=child.weight.device)
- new_weight[:child.weight.shape[0]].copy_(child.weight.data)
- new_weight[child.weight.shape[0]:].fill_(0)
- child.weight.data = new_weight
- def _check_tensors_on_meta_device(self):
- for tensor in self.model.state_dict().values():
- if tensor.device.type == 'meta':
- raise ValueError(
- "The serialized model contains tensors on the meta device,"
- " indicating that some tensors were not loaded properly."
- " Please check that the parameters of the model being"
- " specified match that of the serialized model, such as"
- " its quantization.")
- def deserialize(self):
- """
- Deserialize the model using the TensorDeserializer. This method is
- specifically for Aphrodite models using tensorizer's plaid_mode.
- The deserializer makes use of tensorizer_args.stream_params
- to configure the behavior of the stream when loading tensors from a
- serialized model. The deserializer_params are used to configure the
- behavior of the TensorDeserializer when loading tensors themselves.
- Documentation on these params can be found in TensorizerArgs
- Returns:
- nn.Module: The deserialized model.
- """
- before_mem = get_mem_usage()
- start = time.perf_counter()
- with _read_stream(
- self.tensorizer_config.tensorizer_uri,
- **self.tensorizer_args.stream_params
- ) as stream, TensorDeserializer(
- stream,
- dtype=self.tensorizer_config.dtype,
- device=f'cuda:{torch.cuda.current_device()}',
- **self.tensorizer_args.deserializer_params) as deserializer:
- deserializer.load_into_module(self.model)
- end = time.perf_counter()
- total_bytes_str = convert_bytes(deserializer.total_tensor_bytes)
- duration = end - start
- per_second = convert_bytes(deserializer.total_tensor_bytes / duration)
- after_mem = get_mem_usage()
- deserializer.close()
- logger.info(f"Deserialized {total_bytes_str} in "
- f"{end - start:0.2f}s, {per_second}/s")
- logger.info(f"Memory usage before: {before_mem}")
- logger.info(f"Memory usage after: {after_mem}")
- self._check_tensors_on_meta_device()
- self._resize_lora_embeddings()
- del self.model.aphrodite_tensorized_marker
- return self.model.eval()
- def tensorizer_weights_iterator(
- tensorizer_args: "TensorizerArgs"
- ) -> Generator[Tuple[str, torch.Tensor], None, None]:
- logger.warning(
- "Deserializing HuggingFace models is not optimized for "
- "loading on Aphrodite, as tensorizer is forced to load to CPU. "
- "Consider deserializing a Aphrodite model instead for faster "
- "load times. See the examples/tensorize_aphrodite_model.py example "
- "script for serializing Aphrodite models.")
- deserializer_args = tensorizer_args.deserializer_params
- stream_params = tensorizer_args.stream_params
- stream = open_stream(tensorizer_args.tensorizer_uri, **stream_params)
- with TensorDeserializer(stream, **deserializer_args,
- device="cpu") as state:
- for name, param in state.items():
- yield name, param
- del state
- def is_aphrodite_tensorized(tensorizer_config: "TensorizerConfig") -> bool:
- """
- Infer if the model is a Aphrodite model by checking the weights for
- a Aphrodite tensorized marker.
- Args:
- tensorizer_config: The TensorizerConfig object containing the
- tensorizer_uri to the serialized model.
- Returns:
- bool: True if the model is a Aphrodite model, False otherwise.
- """
- tensorizer_args = tensorizer_config._construct_tensorizer_args()
- deserializer = TensorDeserializer(open_stream(
- tensorizer_args.tensorizer_uri, **tensorizer_args.stream_params),
- **tensorizer_args.deserializer_params,
- lazy_load=True)
- if tensorizer_config.aphrodite_tensorized:
- logger.warning(
- "Please note that newly serialized Aphrodite models are "
- "automatically inferred as Aphrodite models, so setting "
- "aphrodite_tensorized=True is only necessary for models serialized "
- "prior to this change.")
- return True
- if (".aphrodite_tensorized_marker" in deserializer):
- return True
- return False
- def serialize_aphrodite_model(
- model: nn.Module,
- tensorizer_config: TensorizerConfig,
- ) -> nn.Module:
- model.register_parameter(
- "aphrodite_tensorized_marker",
- nn.Parameter(torch.tensor((1, ), device="meta"), requires_grad=False))
- tensorizer_args = tensorizer_config._construct_tensorizer_args()
- encryption_params = None
- if (keyfile := tensorizer_config.encryption_keyfile) is not None:
- with open(keyfile, "rb") as f:
- key = f.read()
- encryption_params = EncryptionParams(key=key)
- output_file = tensorizer_args.tensorizer_uri
- if tensorizer_config._is_sharded:
- from aphrodite.distributed import get_tensor_model_parallel_rank
- output_file = output_file % get_tensor_model_parallel_rank()
- with _write_stream(output_file, **tensorizer_args.stream_params) as stream:
- serializer = TensorSerializer(stream, encryption=encryption_params)
- serializer.write_module(model)
- serializer.close()
- logger.info(f"Successfully serialized model to {str(output_file)}")
- return model
- def tensorize_aphrodite_model(engine_args: EngineArgs,
- tensorizer_config: TensorizerConfig,
- generate_keyfile: bool = True):
- """Utility to load a model and then serialize it with Tensorizer
- Intended to be used separately from running a aphrodite server since it
- creates its own Engine instance.
- """
- engine_config = engine_args.create_engine_config()
- tensorizer_config.verify_with_model_config(engine_config.model_config)
- tensorizer_config.verify_with_parallel_config(
- engine_config.parallel_config)
- # generate the encryption key before creating the engine to support
- # sharding
- if generate_keyfile and (keyfile :=
- tensorizer_config.encryption_keyfile) is not None:
- encryption_params = EncryptionParams.random()
- with _write_stream(
- keyfile,
- s3_access_key_id=tensorizer_config.s3_access_key_id,
- s3_secret_access_key=tensorizer_config.s3_secret_access_key,
- s3_endpoint=tensorizer_config.s3_endpoint,
- ) as stream:
- stream.write(encryption_params.key)
- engine = AphroditeEngine.from_engine_args(engine_args)
- if tensorizer_config._is_sharded:
- # if the engine is a distributed engine (for tensor parallel) then each
- # worker shard needs to serialize its part of the model.
- engine.model_executor._run_workers(
- "save_tensorized_model",
- tensorizer_config=tensorizer_config,
- )
- else:
- # with a single worker, we can get to the underlying model directly
- serialize_aphrodite_model(
- engine.model_executor.driver_worker.model_runner.model,
- tensorizer_config,
- )
|