123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- from typing import List, Optional
- import torch
- import torch.types
- from aphrodite.common.utils import is_pin_memory_available
- class LoRALayerWeights:
- """LoRA weights for a layer composed of two low rank matrixes."""
- def __init__(
- self,
- module_name: str,
- rank: Optional[int],
- lora_alpha: int,
- lora_a: Optional[torch.Tensor],
- lora_b: torch.Tensor,
- embeddings_tensor: Optional[torch.Tensor] = None,
- scaling: Optional[float] = None,
- ) -> None:
- """
- rank == None means that we have full rank tensors (ModulesToSave)
- in this case:
- lora_a=None
- lora_b=full rank tensor
- """
- self.module_name = module_name
- self.rank = rank
- self.lora_alpha = lora_alpha
- self.lora_a = lora_a
- self.lora_b = lora_b
- self.embeddings_tensor = embeddings_tensor
- self.scaling: Optional[float]
- if (scaling is None) and (self.rank is not None):
- self.scaling = self.lora_alpha / self.rank
- else:
- self.scaling = scaling
- def optimize(self) -> "LoRALayerWeights":
- """Optimize the LoRA by merging the scaling into lora_b."""
- if self.scaling == 1:
- return self
- self.lora_b *= self.scaling
- self.scaling = 1
- return self
- @property
- def input_dim(self) -> int:
- if self.lora_a is not None:
- return self.lora_a.shape[0]
- return self.lora_b.shape[0]
- @property
- def output_dim(self) -> int:
- return self.lora_b.shape[1]
- @property
- def is_packed(self) -> bool:
- return False
- @property
- def extra_vocab_size(self) -> int:
- return self.embeddings_tensor.shape[
- 0] if self.embeddings_tensor is not None else 0
- @classmethod
- def create_dummy_lora_weights(
- cls,
- module_name: str,
- input_dim: int,
- output_dim: int,
- rank: Optional[int],
- dtype: torch.types.Device,
- device: torch.device,
- embeddings_tensor_dim: Optional[int] = None) -> "LoRALayerWeights":
- pin_memory = str(device) == "cpu" and is_pin_memory_available()
- if rank is None:
- lora_a = None
- lora_b = torch.zeros([input_dim, output_dim],
- dtype=dtype,
- device=device,
- pin_memory=pin_memory)
- embeddings_tensor = None
- scaling = 1
- else:
- lora_a = torch.zeros([input_dim, rank],
- dtype=dtype,
- device=device,
- pin_memory=pin_memory)
- lora_b = torch.zeros([rank, output_dim],
- dtype=dtype,
- device=device,
- pin_memory=pin_memory)
- scaling = None
- embeddings_tensor = torch.rand(
- 10,
- embeddings_tensor_dim,
- dtype=dtype,
- device=device,
- pin_memory=pin_memory) if embeddings_tensor_dim else None
- return cls(
- module_name,
- rank=rank,
- lora_alpha=1,
- lora_a=lora_a,
- lora_b=lora_b,
- scaling=scaling,
- embeddings_tensor=embeddings_tensor,
- )
- def lora_a_pin_memory(self):
- if self.lora_a is not None:
- self.lora_a = self.lora_a.pin_memory()
- def lora_b_pin_memory(self):
- self.lora_b = self.lora_b.pin_memory()
- class PackedLoRALayerWeights(LoRALayerWeights):
- """LoRA used for packed layers (eg. qkv_proj)."""
- def __init__(
- self,
- module_name: str,
- rank: Optional[int],
- lora_alphas: List[Optional[int]],
- lora_a: List[Optional[torch.Tensor]],
- lora_b: List[Optional[torch.Tensor]],
- scaling: Optional[List[float]] = None,
- ) -> None:
- super().__init__(
- module_name=module_name,
- rank=rank,
- lora_alpha=0,
- lora_a=lora_a,
- lora_b=lora_b,
- scaling=scaling, # type: ignore
- embeddings_tensor=None,
- )
- self.lora_alphas = lora_alphas
- if (scaling is None) and (self.rank is not None):
- self.scaling = [ # type: ignore
- lora_alpha / self.rank # type: ignore # noqa
- for lora_alpha in self.lora_alphas
- ]
- @classmethod
- def pack(
- cls, loras: List[Optional["LoRALayerWeights"]]
- ) -> "PackedLoRALayerWeights":
- """Pack a list of LoRAs into a single LoRA.
- If LoRA is None, it signifies that the submodule does not have a LoRA.
- """
- first_lora = next(lora for lora in loras if lora is not None)
- for lora in loras:
- if lora is None:
- continue
- lora.optimize()
- rank = first_lora.rank
- module_name = first_lora.module_name
- obj = cls(
- module_name,
- rank,
- [lora.lora_alpha if lora is not None else None for lora in loras],
- [lora.lora_a if lora is not None else None for lora in loras],
- [lora.lora_b if lora is not None else None for lora in loras],
- scaling=[
- 1 if lora is not None else None # type: ignore
- for lora in loras
- ])
- return obj
- def optimize(self) -> "PackedLoRALayerWeights":
- """Optimize the LoRA by merging the scaling into lora_b."""
- for i in range(len(self.lora_b)):
- if self.scaling[i] == 1 or self.lora_b[i] is None: # type: ignore
- continue
- self.lora_b[i] *= self.scaling[i] # type: ignore
- self.scaling[i] = 1 # type: ignore
- return self
- @property
- def input_dim(self) -> int:
- raise NotImplementedError()
- @property
- def output_dim(self) -> int:
- raise NotImplementedError()
- @property
- def is_packed(self) -> bool:
- return True
|