123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- import asyncio
- import json
- from dataclasses import dataclass
- from http import HTTPStatus
- from typing import Dict, List, Optional, Union
- from loguru import logger
- from aphrodite.transformers_utils.tokenizer import get_tokenizer
- from aphrodite.engine.async_aphrodite import AsyncAphrodite
- from aphrodite.endpoints.openai.protocol import (CompletionRequest,
- ChatCompletionRequest,
- ErrorResponse, LogProbs,
- ModelCard, ModelList,
- ModelPermission, Prompt)
- from aphrodite.lora.request import LoRARequest
- from aphrodite.common.sequence import Logprob
- @dataclass
- class LoRA:
- name: str
- local_path: str
- class OpenAIServing:
- def __init__(self,
- engine: AsyncAphrodite,
- served_model: str,
- lora_modules=Optional[List[LoRA]]):
- self.engine = engine
- self.served_model = served_model
- if lora_modules is None:
- self.lora_requests = []
- else:
- self.lora_requests = [
- LoRARequest(
- lora_name=lora.name,
- lora_int_id=i,
- lora_local_path=lora.local_path,
- ) for i, lora in enumerate(lora_modules, start=1)
- ]
- self.max_model_len = 0
- self.tokenizer = None
- try:
- event_loop = asyncio.get_running_loop()
- except RuntimeError:
- event_loop = None
- if event_loop is not None and event_loop.is_running(
- ): # If the current is instanced by Ray Serve, there is already
- # a running event loop
- event_loop.create_task(self._post_init())
- else: # When using single Aphrodite without engine_use_ray
- asyncio.run(self._post_init())
- async def _post_init(self):
- engine_model_config = await self.engine.get_model_config()
- self.max_model_len = engine_model_config.max_model_len
- # A separate tokenizer to map token IDs to strings.
- self.tokenizer = get_tokenizer(
- engine_model_config.tokenizer,
- tokenizer_mode=engine_model_config.tokenizer_mode,
- trust_remote_code=engine_model_config.trust_remote_code)
- async def show_available_models(self) -> ModelList:
- """Show available models. Right now we only have one model."""
- model_cards = [
- ModelCard(id=self.served_model,
- root=self.served_model,
- permission=[ModelPermission()])
- ]
- lora_cards = [
- ModelCard(id=lora.lora_name,
- root=self.served_model,
- permission=[ModelPermission()])
- for lora in self.lora_requests
- ]
- model_cards.extend(lora_cards)
- return ModelList(data=model_cards)
- async def tokenize(self, prompt: Prompt):
- """Tokenize a given prompt."""
- tokenized_prompt = self.tokenizer.tokenize(prompt.prompt)
- token_ids = self.tokenizer.convert_tokens_to_ids(tokenized_prompt)
- return {"value": len(tokenized_prompt), "ids": token_ids}
- async def detokenize(self, token_ids: List[int]):
- """Detokenize a given list of token IDs."""
- tokens = self.tokenizer.convert_ids_to_tokens(token_ids)
- detokenized_text = self.tokenizer.convert_tokens_to_string(tokens)
- return {"value": detokenized_text}
- def _create_logprobs(
- self,
- token_ids: List[int],
- top_logprobs: Optional[List[Optional[Dict[int, Logprob]]]] = None,
- num_output_top_logprobs: Optional[int] = None,
- initial_text_offset: int = 0,
- ) -> LogProbs:
- """Create OpenAI-style logprobs."""
- logprobs = LogProbs()
- last_token_len = 0
- if num_output_top_logprobs:
- logprobs.top_logprobs = []
- for i, token_id in enumerate(token_ids):
- step_top_logprobs = top_logprobs[i]
- if step_top_logprobs is not None:
- token_logprob = step_top_logprobs[token_id].logprob
- else:
- token_logprob = None
- token = step_top_logprobs[token_id].decoded_token
- logprobs.tokens.append(token)
- logprobs.token_logprobs.append(token_logprob)
- if len(logprobs.text_offset) == 0:
- logprobs.text_offset.append(initial_text_offset)
- else:
- logprobs.text_offset.append(logprobs.text_offset[-1] +
- last_token_len)
- last_token_len = len(token)
- if num_output_top_logprobs:
- logprobs.top_logprobs.append({
- p.decoded_token: p.logprob
- for i, p in step_top_logprobs.items()
- } if step_top_logprobs else None)
- logprobs.top_logprobs = [{
- k: v if v > -1000 else -1000
- for k, v in top_logprob.items()
- } for top_logprob in logprobs.top_logprobs if top_logprob is not None]
- return logprobs
- def create_error_response(
- self,
- message: str,
- err_type: str = "BadRequestError",
- status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> ErrorResponse:
- return ErrorResponse(message=message,
- type=err_type,
- code=status_code.value)
- def create_streaming_error_response(
- self,
- message: str,
- err_type: str = "BadRequestError",
- status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> str:
- json_str = json.dumps({
- "error":
- self.create_error_response(message=message,
- err_type=err_type,
- status_code=status_code).model_dump()
- })
- return json_str
- async def _check_model(self, request) -> Optional[ErrorResponse]:
- if request.model == self.served_model:
- return
- if request.model in [lora.lora_name for lora in self.lora_requests]:
- return
- return self.create_error_response(
- message=f"The model `{request.model}` does not exist.",
- err_type="NotFoundError",
- status_code=HTTPStatus.NOT_FOUND)
- def add_lora(self, lora: LoRA):
- if lora.name in [
- existing_lora.lora_name for existing_lora in self.lora_requests
- ]:
- logger.error(f"LoRA with name {lora.name} already exists.")
- return
- self.lora_requests.append(
- LoRARequest(
- lora_name=lora.name,
- lora_int_id=len(self.lora_requests) + 1,
- lora_local_path=lora.local_path,
- ))
- def remove_lora(self, lora_name: str):
- self.lora_requests = [
- lora for lora in self.lora_requests if lora.lora_name != lora_name
- ]
- def _maybe_get_lora(self, request) -> Optional[LoRARequest]:
- if request.model == self.served_model:
- return
- for lora in self.lora_requests:
- if request.model == lora.lora_name:
- return lora
- # if _check_model has been called earlier, this will be unreachable
- raise ValueError("The model `{request.model}` does not exist.")
- def _validate_prompt_and_tokenize(
- self,
- request: Union[ChatCompletionRequest, CompletionRequest],
- prompt: Optional[str] = None,
- prompt_ids: Optional[List[int]] = None) -> List[int]:
- if not (prompt or prompt_ids):
- raise ValueError("Either prompt or prompt_ids should be provided.")
- if (prompt and prompt_ids):
- raise ValueError(
- "Only one of prompt or prompt_ids should be provided.")
- input_ids = prompt_ids if prompt_ids is not None else self.tokenizer(
- prompt).input_ids
- token_num = len(input_ids)
- if request.max_tokens is None:
- request.max_tokens = self.max_model_len - token_num
- if token_num + request.max_tokens > self.max_model_len:
- raise ValueError(
- f"This model's maximum context length is {self.max_model_len} "
- "tokens. "
- f"However, you requested {request.max_tokens + token_num} "
- f"tokens ({token_num} in the messages, "
- f"{request.max_tokens} in the completion). "
- f"Please reduce the length of the messages or completion.", )
- else:
- return input_ids
|