worker_manager.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. from abc import ABC, abstractmethod, abstractproperty
  2. from typing import Any, Dict, List, Optional, Set, Type
  3. import torch
  4. from aphrodite.common.config import LoRAConfig
  5. from aphrodite.lora.layers import LoRAMapping
  6. from aphrodite.lora.models import (LoRAModel, LoRAModelManager,
  7. LRUCacheLoRAModelManager,
  8. create_lora_manager)
  9. from aphrodite.lora.request import LoRARequest
  10. class AbstractWorkerLoRAManager(ABC):
  11. """Abstract class for managing LoRA models on the worker side."""
  12. def __init__(self, max_num_seqs: int, max_num_batched_tokens: int,
  13. vocab_size: int, lora_config: LoRAConfig,
  14. device: torch.device):
  15. self.max_num_seqs = max_num_seqs
  16. self.max_num_batched_tokens = max_num_batched_tokens
  17. self.vocab_size = vocab_size
  18. self.device = device
  19. self.lora_config = lora_config
  20. @abstractproperty
  21. def is_enabled(self) -> bool:
  22. ...
  23. @abstractmethod
  24. def create_lora_manager(
  25. self,
  26. model: torch.nn.Module,
  27. ) -> Any:
  28. ...
  29. @abstractmethod
  30. def set_active_loras(self, lora_requests: List[LoRARequest],
  31. lora_mapping: LoRAMapping) -> None:
  32. ...
  33. @abstractmethod
  34. def add_lora(self, lora_request: LoRARequest) -> bool:
  35. ...
  36. @abstractmethod
  37. def add_dummy_lora(self, lora_request: LoRARequest, rank: int) -> bool:
  38. ...
  39. @abstractmethod
  40. def remove_lora(self, lora_id: int) -> bool:
  41. ...
  42. @abstractmethod
  43. def remove_all_loras(self) -> bool:
  44. ...
  45. @abstractmethod
  46. def list_loras(self) -> Set[int]:
  47. ...
  48. class WorkerLoRAManager(AbstractWorkerLoRAManager):
  49. """WorkerLoRAManager that manages LoRA models on the worker side.
  50. Every request, the requested LoRAs will be loaded (unless they are already
  51. loaded), and every other LoRA will be unloaded."""
  52. _lora_manager_cls: Type[LoRAModelManager] = LoRAModelManager
  53. def __init__(
  54. self,
  55. max_num_seqs: int,
  56. max_num_batched_tokens: int,
  57. vocab_size: int,
  58. lora_config: LoRAConfig,
  59. device: torch.device,
  60. embedding_modules: Dict[str, str],
  61. embedding_padding_modules: List[str],
  62. lora_model_cls: Type[LoRAModel] = LoRAModel,
  63. ):
  64. self._lora_manager: Optional[LoRAModelManager] = None
  65. self._lora_model_cls = lora_model_cls
  66. self.embedding_modules = embedding_modules
  67. self.embedding_padding_modules = embedding_padding_modules
  68. super().__init__(max_num_seqs, max_num_batched_tokens, vocab_size,
  69. lora_config, device)
  70. @property
  71. def is_enabled(self) -> bool:
  72. return True
  73. def create_lora_manager(
  74. self,
  75. model: torch.nn.Module,
  76. ) -> Any:
  77. lora_manager = create_lora_manager(
  78. model,
  79. max_num_seqs=self.max_num_seqs,
  80. max_num_batched_tokens=self.max_num_batched_tokens,
  81. vocab_size=self.vocab_size,
  82. lora_config=self.lora_config,
  83. lora_manager_cls=self._lora_manager_cls,
  84. )
  85. self._lora_manager: LoRAModelManager = lora_manager
  86. return lora_manager.model
  87. def set_active_loras(self, lora_requests: List[LoRARequest],
  88. lora_mapping: LoRAMapping) -> None:
  89. self._apply_loras(lora_requests)
  90. self._lora_manager.set_lora_mapping(lora_mapping)
  91. def _apply_loras(self, lora_requests: List[LoRARequest]) -> None:
  92. loras_that_exist = self.list_loras()
  93. loras_map = {
  94. lora_request.lora_int_id: lora_request
  95. for lora_request in lora_requests if lora_request
  96. }
  97. if len(loras_map) > self._lora_manager.lora_slots:
  98. raise RuntimeError(
  99. f"Number of requested LoRAs ({len(loras_map)}) is greater "
  100. "than the number of GPU LoRA slots "
  101. f"({self._lora_manager.lora_slots}).")
  102. new_loras = set(loras_map)
  103. loras_to_add = new_loras - loras_that_exist
  104. loras_to_remove = loras_that_exist - new_loras
  105. for lora_id in loras_to_remove:
  106. self.remove_lora(lora_id)
  107. for lora_id in loras_to_add:
  108. self.add_lora(loras_map[lora_id])
  109. def _load_lora(self, lora_request: LoRARequest) -> LoRAModel:
  110. try:
  111. model = self._lora_manager.model
  112. supported_lora_modules = model.supported_lora_modules
  113. packed_modules_mapping = model.packed_modules_mapping
  114. expected_lora_modules = []
  115. for module in supported_lora_modules:
  116. if module in packed_modules_mapping:
  117. expected_lora_modules.extend(
  118. packed_modules_mapping[module])
  119. else:
  120. expected_lora_modules.append(module)
  121. lora = self._lora_model_cls.from_local_checkpoint(
  122. lora_request.lora_local_path,
  123. expected_lora_modules,
  124. lora_model_id=lora_request.lora_int_id,
  125. device="cpu",
  126. dtype=self.lora_config.lora_dtype,
  127. target_embedding_padding=self.vocab_size +
  128. self.lora_config.lora_extra_vocab_size,
  129. embedding_modules=self.embedding_modules,
  130. embedding_padding_modules=self.embedding_padding_modules,
  131. )
  132. except Exception as e:
  133. raise RuntimeError(
  134. f"Loading lora {lora_request.lora_local_path} failed") from e
  135. if lora.rank > self.lora_config.max_lora_rank:
  136. raise ValueError(
  137. f"LoRA rank {lora.rank} is greater than max_lora_rank "
  138. f"{self.lora_config.max_lora_rank}.")
  139. if lora.extra_vocab_size > self.lora_config.lora_extra_vocab_size:
  140. raise ValueError(f"LoRA added vocab size {lora.extra_vocab_size} "
  141. f"is greater than lora_extra_vocab_size "
  142. f"{self.lora_config.lora_extra_vocab_size}.")
  143. return lora
  144. def add_dummy_lora(self, lora_request: LoRARequest, rank: int) -> bool:
  145. if lora_request.lora_int_id in self.list_loras():
  146. return False
  147. return self._lora_manager.add_lora(
  148. self._lora_manager.create_dummy_lora(lora_request.lora_int_id,
  149. rank, self.embedding_modules))
  150. def add_lora(self, lora_request: LoRARequest) -> bool:
  151. if lora_request.lora_int_id in self.list_loras():
  152. return False
  153. lora = self._load_lora(lora_request)
  154. loaded = self._lora_manager.add_lora(lora)
  155. self._lora_manager.activate_lora(lora.id)
  156. return loaded
  157. def remove_lora(self, lora_id: int) -> bool:
  158. return self._lora_manager.remove_lora(lora_id)
  159. def remove_all_loras(self) -> bool:
  160. self._lora_manager.remove_all_loras()
  161. def list_loras(self) -> Set[int]:
  162. return set(self._lora_manager.list_loras())
  163. class LRUCacheWorkerLoRAManager(WorkerLoRAManager):
  164. """WorkerLoRAManager that manages LoRA models on the worker side.
  165. Uses an LRU Cache. Every request, the requested LoRAs will be loaded
  166. (unless they are already loaded) and least recently used LoRAs will
  167. be unloaded if the cache is above capacity."""
  168. _lora_manager_cls: Type[
  169. LRUCacheLoRAModelManager] = LRUCacheLoRAModelManager
  170. def create_lora_manager(
  171. self,
  172. model: torch.nn.Module,
  173. ) -> Any:
  174. lora_manager = create_lora_manager(
  175. model,
  176. lora_manager_cls=self._lora_manager_cls,
  177. max_num_seqs=self.max_num_seqs,
  178. vocab_size=self.vocab_size,
  179. lora_config=self.lora_config,
  180. max_num_batched_tokens=self.max_num_batched_tokens,
  181. )
  182. self._lora_manager: LRUCacheLoRAModelManager = lora_manager
  183. return lora_manager.model
  184. def _apply_loras(self, lora_requests: List[LoRARequest]) -> None:
  185. loras_map = {
  186. lora_request.lora_int_id: lora_request
  187. for lora_request in lora_requests if lora_request
  188. }
  189. if len(loras_map) > self._lora_manager.lora_slots:
  190. raise RuntimeError(
  191. f"Number of requested LoRAs ({len(loras_map)}) is greater "
  192. "than the number of GPU LoRA slots "
  193. f"({self._lora_manager.lora_slots}).")
  194. for lora in loras_map.values():
  195. self.add_lora(lora)
  196. def add_lora(self, lora_request: LoRARequest) -> bool:
  197. if lora_request.lora_int_id not in self.list_loras():
  198. # Remove before we load the new lora to save memory
  199. if len(self._lora_manager) + 1 > self._lora_manager.capacity:
  200. self._lora_manager.remove_oldest_lora()
  201. lora = self._load_lora(lora_request)
  202. loaded = self._lora_manager.add_lora(lora)
  203. else:
  204. # If the lora is already loaded, just touch it to
  205. # update its position in the caches
  206. loaded = self._lora_manager.get_lora(lora_request.lora_int_id)
  207. self._lora_manager.activate_lora(lora_request.lora_int_id)
  208. return loaded