import asyncio import json from dataclasses import dataclass from http import HTTPStatus from typing import Dict, List, Optional, Union from aphrodite.transformers_utils.tokenizer import get_tokenizer from aphrodite.engine.async_aphrodite import AsyncAphrodite from aphrodite.endpoints.openai.protocol import (CompletionRequest, ChatCompletionRequest, ErrorResponse, LogProbs, ModelCard, ModelList, ModelPermission, Prompt) from aphrodite.lora.request import LoRARequest from aphrodite.common.sequence import Logprob @dataclass class LoRA: name: str local_path: str class OpenAIServing: def __init__(self, engine: AsyncAphrodite, served_model: str, lora_modules=Optional[List[LoRA]]): self.engine = engine self.served_model = served_model if lora_modules is None: self.lora_requests = [] else: self.lora_requests = [ LoRARequest( lora_name=lora.name, lora_int_id=i, lora_local_path=lora.local_path, ) for i, lora in enumerate(lora_modules, start=1) ] self.max_model_len = 0 self.tokenizer = None try: event_loop = asyncio.get_running_loop() except RuntimeError: event_loop = None if event_loop is not None and event_loop.is_running( ): # If the current is instanced by Ray Serve, there is already a running event loop event_loop.create_task(self._post_init()) else: # When using single Aphrodite without engine_use_ray asyncio.run(self._post_init()) async def _post_init(self): engine_model_config = await self.engine.get_model_config() self.max_model_len = engine_model_config.max_model_len # A separate tokenizer to map token IDs to strings. self.tokenizer = get_tokenizer( engine_model_config.tokenizer, tokenizer_mode=engine_model_config.tokenizer_mode, trust_remote_code=engine_model_config.trust_remote_code) async def show_available_models(self) -> ModelList: """Show available models. Right now we only have one model.""" model_cards = [ ModelCard(id=self.served_model, root=self.served_model, permission=[ModelPermission()]) ] lora_cards = [ ModelCard(id=lora.lora_name, root=self.served_model, permission=[ModelPermission()]) for lora in self.lora_requests ] model_cards.extend(lora_cards) return ModelList(data=model_cards) async def tokenize(self, prompt: Prompt): """Tokenize a given prompt.""" tokenized_prompt = self.tokenizer.tokenize(prompt.prompt) token_ids = self.tokenizer.convert_tokens_to_ids(tokenized_prompt) return {"value": len(tokenized_prompt), "ids": token_ids} async def detokenize(self, token_ids: List[int]): """Detokenize a given list of token IDs.""" tokens = self.tokenizer.convert_ids_to_tokens(token_ids) detokenized_text = self.tokenizer.convert_tokens_to_string(tokens) return {"value": detokenized_text} def _create_logprobs( self, token_ids: List[int], top_logprobs: Optional[List[Optional[Dict[int, Logprob]]]] = None, num_output_top_logprobs: Optional[int] = None, initial_text_offset: int = 0, ) -> LogProbs: """Create OpenAI-style logprobs.""" logprobs = LogProbs() last_token_len = 0 if num_output_top_logprobs: logprobs.top_logprobs = [] for i, token_id in enumerate(token_ids): step_top_logprobs = top_logprobs[i] if step_top_logprobs is not None: token_logprob = step_top_logprobs[token_id].logprob else: token_logprob = None token = step_top_logprobs[token_id].decoded_token logprobs.tokens.append(token) logprobs.token_logprobs.append(token_logprob) if len(logprobs.text_offset) == 0: logprobs.text_offset.append(initial_text_offset) else: logprobs.text_offset.append(logprobs.text_offset[-1] + last_token_len) last_token_len = len(token) if num_output_top_logprobs: logprobs.top_logprobs.append({ p.decoded_token: p.logprob for i, p in step_top_logprobs.items() } if step_top_logprobs else None) return logprobs def create_error_response( self, message: str, err_type: str = "BadRequestError", status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> ErrorResponse: return ErrorResponse(message=message, type=err_type, code=status_code.value) def create_streaming_error_response( self, message: str, err_type: str = "BadRequestError", status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> str: json_str = json.dumps({ "error": self.create_error_response(message=message, err_type=err_type, status_code=status_code).model_dump() }) return json_str async def _check_model(self, request) -> Optional[ErrorResponse]: if request.model == self.served_model: return if request.model in [lora.lora_name for lora in self.lora_requests]: return return self.create_error_response( message=f"The model `{request.model}` does not exist.", err_type="NotFoundError", status_code=HTTPStatus.NOT_FOUND) def _maybe_get_lora(self, request) -> Optional[LoRARequest]: if request.model == self.served_model: return for lora in self.lora_requests: if request.model == lora.lora_name: return lora # if _check_model has been called earlier, this will be unreachable raise ValueError("The model `{request.model}` does not exist.") def _validate_prompt_and_tokenize( self, request: Union[ChatCompletionRequest, CompletionRequest], prompt: Optional[str] = None, prompt_ids: Optional[List[int]] = None) -> List[int]: if not (prompt or prompt_ids): raise ValueError("Either prompt or prompt_ids should be provided.") if (prompt and prompt_ids): raise ValueError( "Only one of prompt or prompt_ids should be provided.") input_ids = prompt_ids if prompt_ids is not None else self.tokenizer( prompt).input_ids token_num = len(input_ids) if request.max_tokens is None: request.max_tokens = self.max_model_len - token_num if token_num + request.max_tokens > self.max_model_len: raise ValueError( f"This model's maximum context length is {self.max_model_len} tokens. " f"However, you requested {request.max_tokens + token_num} tokens " f"({token_num} in the messages, " f"{request.max_tokens} in the completion). " f"Please reduce the length of the messages or completion.", ) else: return input_ids