1
0

serving_engine.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. import asyncio
  2. import json
  3. from dataclasses import dataclass
  4. from http import HTTPStatus
  5. from typing import Dict, List, Optional, Tuple, Union
  6. from loguru import logger
  7. from pydantic import conint
  8. from aphrodite.common.sequence import Logprob
  9. from aphrodite.endpoints.openai.protocol import (
  10. ChatCompletionRequest, CompletionRequest, EmbeddingRequest, ErrorResponse,
  11. LogProbs, ModelCard, ModelList, ModelPermission, Prompt)
  12. from aphrodite.engine.async_aphrodite import AsyncAphrodite
  13. from aphrodite.lora.request import LoRARequest
  14. from aphrodite.transformers_utils.tokenizer import get_tokenizer
  15. @dataclass
  16. class LoRA:
  17. name: str
  18. local_path: str
  19. class OpenAIServing:
  20. def __init__(self,
  21. engine: AsyncAphrodite,
  22. served_model_names: List[str],
  23. lora_modules=Optional[List[LoRA]]):
  24. self.engine = engine
  25. self.served_model_names = served_model_names
  26. if lora_modules is None:
  27. self.lora_requests = []
  28. else:
  29. self.lora_requests = [
  30. LoRARequest(
  31. lora_name=lora.name,
  32. lora_int_id=i,
  33. lora_local_path=lora.local_path,
  34. ) for i, lora in enumerate(lora_modules, start=1)
  35. ]
  36. self.max_model_len = 0
  37. self.tokenizer = None
  38. try:
  39. event_loop = asyncio.get_running_loop()
  40. except RuntimeError:
  41. event_loop = None
  42. if event_loop is not None and event_loop.is_running():
  43. # If the current is instanced by Ray Serve,
  44. # there is already a running event loop
  45. event_loop.create_task(self._post_init())
  46. else:
  47. # When using single Aphrodite without engine_use_ray
  48. asyncio.run(self._post_init())
  49. async def _post_init(self):
  50. engine_model_config = await self.engine.get_model_config()
  51. self.max_model_len = engine_model_config.max_model_len
  52. # A separate tokenizer to map token IDs to strings.
  53. self.tokenizer = get_tokenizer(
  54. engine_model_config.tokenizer,
  55. tokenizer_mode=engine_model_config.tokenizer_mode,
  56. trust_remote_code=engine_model_config.trust_remote_code,
  57. revision=engine_model_config.revision,
  58. truncation_side="left")
  59. async def show_available_models(self) -> ModelList:
  60. """Show available models. Right now we only have one model."""
  61. model_cards = [
  62. ModelCard(id=served_model_name,
  63. root=self.served_model_names[0],
  64. permission=[ModelPermission()])
  65. for served_model_name in self.served_model_names
  66. ]
  67. lora_cards = [
  68. ModelCard(id=lora.lora_name,
  69. root=self.served_model_names[0],
  70. permission=[ModelPermission()])
  71. for lora in self.lora_requests
  72. ]
  73. model_cards.extend(lora_cards)
  74. return ModelList(data=model_cards)
  75. async def tokenize(self, prompt: Prompt):
  76. """Tokenize a given prompt."""
  77. tokenized_prompt = self.tokenizer.tokenize(prompt.prompt)
  78. token_ids = self.tokenizer.convert_tokens_to_ids(tokenized_prompt)
  79. return {"value": len(tokenized_prompt), "ids": token_ids}
  80. async def detokenize(self, token_ids: List[int]):
  81. """Detokenize a given list of token IDs."""
  82. tokens = self.tokenizer.convert_ids_to_tokens(token_ids)
  83. detokenized_text = self.tokenizer.convert_tokens_to_string(tokens)
  84. return {"value": detokenized_text}
  85. def _create_logprobs(
  86. self,
  87. token_ids: List[int],
  88. top_logprobs: Optional[List[Optional[Dict[int, Logprob]]]] = None,
  89. num_output_top_logprobs: Optional[int] = None,
  90. initial_text_offset: int = 0,
  91. ) -> LogProbs:
  92. """Create OpenAI-style logprobs."""
  93. logprobs = LogProbs()
  94. last_token_len = 0
  95. if num_output_top_logprobs:
  96. logprobs.top_logprobs = []
  97. for i, token_id in enumerate(token_ids):
  98. step_top_logprobs = top_logprobs[i]
  99. if step_top_logprobs is None:
  100. token = self.tokenizer.decode(token_id)
  101. logprobs.tokens.append(token)
  102. logprobs.token_logprobs.append(None)
  103. logprobs.top_logprobs.append(None)
  104. else:
  105. token_logprob = step_top_logprobs[token_id].logprob
  106. token = step_top_logprobs[token_id].decoded_token
  107. logprobs.tokens.append(token)
  108. token_logprob = max(token_logprob, -9999.0)
  109. logprobs.token_logprobs.append(token_logprob)
  110. if num_output_top_logprobs:
  111. logprobs.top_logprobs.append({
  112. # Convert float("-inf") to the
  113. # JSON-serializable float that OpenAI uses
  114. p.decoded_token: max(p.logprob, -9999.0)
  115. for i, p in step_top_logprobs.items()
  116. } if step_top_logprobs else None)
  117. # TODO: Check if this is still needed
  118. if logprobs.top_logprobs:
  119. logprobs.top_logprobs = [{
  120. k: v if v > -1000 else -1000
  121. for k, v in top_logprob.items()
  122. } for top_logprob in logprobs.top_logprobs
  123. if top_logprob is not None
  124. ] # noqa: E501
  125. if len(logprobs.text_offset) == 0:
  126. logprobs.text_offset.append(initial_text_offset)
  127. else:
  128. logprobs.text_offset.append(logprobs.text_offset[-1] +
  129. last_token_len)
  130. last_token_len = len(token)
  131. return logprobs
  132. def create_error_response(
  133. self,
  134. message: str,
  135. err_type: str = "BadRequestError",
  136. status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> ErrorResponse:
  137. return ErrorResponse(message=message,
  138. type=err_type,
  139. code=status_code.value)
  140. def create_streaming_error_response(
  141. self,
  142. message: str,
  143. err_type: str = "BadRequestError",
  144. status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> str:
  145. json_str = json.dumps({
  146. "error":
  147. self.create_error_response(message=message,
  148. err_type=err_type,
  149. status_code=status_code).model_dump()
  150. })
  151. return json_str
  152. async def _check_model(self, request) -> Optional[ErrorResponse]:
  153. if request.model in self.served_model_names:
  154. return
  155. if request.model in [lora.lora_name for lora in self.lora_requests]:
  156. return
  157. return self.create_error_response(
  158. message=f"The model `{request.model}` does not exist.",
  159. err_type="NotFoundError",
  160. status_code=HTTPStatus.NOT_FOUND)
  161. def add_lora(self, lora: LoRA):
  162. if lora.name in [
  163. existing_lora.lora_name for existing_lora in self.lora_requests
  164. ]:
  165. logger.error(f"LoRA with name {lora.name} already exists.")
  166. return
  167. self.lora_requests.append(
  168. LoRARequest(
  169. lora_name=lora.name,
  170. lora_int_id=len(self.lora_requests) + 1,
  171. lora_local_path=lora.local_path,
  172. ))
  173. def remove_lora(self, lora_name: str):
  174. self.lora_requests = [
  175. lora for lora in self.lora_requests if lora.lora_name != lora_name
  176. ]
  177. def _maybe_get_lora(self, request) -> Optional[LoRARequest]:
  178. if request.model in self.served_model_names:
  179. return
  180. for lora in self.lora_requests:
  181. if request.model == lora.lora_name:
  182. return lora
  183. # if _check_model has been called earlier, this will be unreachable
  184. raise ValueError("The model `{request.model}` does not exist.")
  185. def _validate_prompt_and_tokenize(
  186. self,
  187. request: Union[ChatCompletionRequest, CompletionRequest,
  188. EmbeddingRequest],
  189. prompt: Optional[str] = None,
  190. prompt_ids: Optional[List[int]] = None,
  191. truncate_prompt_tokens: Optional[conint(ge=1)] = None
  192. ) -> Tuple[List[int], str]:
  193. if not (prompt or prompt_ids):
  194. raise ValueError("Either prompt or prompt_ids should be provided.")
  195. if (prompt and prompt_ids):
  196. raise ValueError(
  197. "Only one of prompt or prompt_ids should be provided.")
  198. if prompt_ids is None:
  199. tokenizer_kwargs = {} if truncate_prompt_tokens is None else {
  200. "truncation": True,
  201. "max_length": truncate_prompt_tokens,
  202. }
  203. input_ids = self.tokenizer(prompt, **tokenizer_kwargs).input_ids
  204. elif truncate_prompt_tokens is not None:
  205. input_ids = prompt_ids[-truncate_prompt_tokens:]
  206. else:
  207. input_ids = prompt_ids
  208. input_text = prompt if prompt is not None else self.tokenizer.decode(
  209. prompt_ids)
  210. token_num = len(input_ids)
  211. # Note: EmbeddingRequest doesn't have max_tokens
  212. if isinstance(request, EmbeddingRequest):
  213. if token_num > self.max_model_len:
  214. raise ValueError(
  215. f"This model's maximum context length is "
  216. f"{self.max_model_len} tokens. However, you requested "
  217. f"{token_num} tokens in the input for embedding "
  218. f"generation. Please reduce the length of the input.", )
  219. return input_ids, input_text
  220. if request.max_tokens is None:
  221. request.max_tokens = self.max_model_len - token_num
  222. if token_num + request.max_tokens > self.max_model_len:
  223. raise ValueError(
  224. f"This model's maximum context length is "
  225. f"{self.max_model_len} tokens. However, you requested "
  226. f"{request.max_tokens + token_num} tokens "
  227. f"({token_num} in the messages, "
  228. f"{request.max_tokens} in the completion). "
  229. f"Please reduce the length of the messages or completion.", )
  230. else:
  231. return input_ids, input_text