123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- import time
- import codecs
- from fastapi import Request
- from typing import AsyncGenerator, AsyncIterator, Optional, List, Union
- from loguru import logger
- from aphrodite.common.utils import random_uuid
- from aphrodite.engine.async_aphrodite import AsyncAphrodite
- from aphrodite.endpoints.openai.protocol import (
- ChatCompletionRequest, ChatCompletionResponse,
- ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice,
- ChatCompletionStreamResponse, ChatMessage, DeltaMessage, ErrorResponse,
- UsageInfo)
- from aphrodite.common.outputs import RequestOutput
- from aphrodite.endpoints.openai.serving_engine import OpenAIServing, LoRA
- from aphrodite.modeling.outlines_decoding import (
- get_guided_decoding_logits_processor)
- class OpenAIServingChat(OpenAIServing):
- def __init__(self,
- engine: AsyncAphrodite,
- served_model: str,
- response_role: str,
- lora_modules: Optional[List[LoRA]] = None,
- chat_template=None):
- super().__init__(engine=engine,
- served_model=served_model,
- lora_modules=lora_modules)
- self.response_role = response_role
- self._load_chat_template(chat_template)
- async def create_chat_completion(
- self, request: ChatCompletionRequest, raw_request: Request
- ) -> Union[ErrorResponse, AsyncGenerator[str, None],
- ChatCompletionResponse]:
- """Completion API similar to OpenAI's API.
- See https://platform.openai.com/docs/api-reference/chat/create
- for the API specification. This API mimics the OpenAI ChatCompletion
- API.
- NOTE: Currently we do not support the following feature:
- - function_call (Users should implement this by themselves)
- """
- error_check_ret = await self._check_model(request)
- if error_check_ret is not None:
- return error_check_ret
- try:
- prompt = self.tokenizer.apply_chat_template(
- conversation=request.messages,
- tokenize=False,
- add_generation_prompt=request.add_generation_prompt)
- except Exception as e:
- logger.error(
- f"Error in applying chat template from request: {str(e)}")
- return self.create_error_response(str(e))
- request_id = f"cmpl-{random_uuid()}"
- try:
- token_ids = self._validate_prompt_and_tokenize(request,
- prompt=prompt)
- sampling_params = request.to_sampling_params()
- lora_request = self._maybe_get_lora(request)
- guided_decode_logits_processor = (
- await get_guided_decoding_logits_processor(
- request, self.engine.get_tokenizer()))
- if guided_decode_logits_processor:
- if sampling_params.logits_processors is None:
- sampling_params.logits_processors = []
- sampling_params.logits_processors.append(
- guided_decode_logits_processor)
- except ValueError as e:
- return self.create_error_response(str(e))
- result_generator = self.engine.generate(prompt, sampling_params,
- request_id, token_ids,
- lora_request)
- # Streaming response
- if request.stream:
- return self.chat_completion_stream_generator(
- request, result_generator, request_id)
- else:
- try:
- return await self.chat_completion_full_generator(
- request, raw_request, result_generator, request_id)
- except ValueError as e:
- # TODO: Use an aphrodite-specific Validation Error
- return self.create_error_response(str(e))
- def get_chat_request_role(self, request: ChatCompletionRequest) -> str:
- if request.add_generation_prompt:
- return self.response_role
- else:
- return request.messages[-1]["role"]
- async def chat_completion_stream_generator(
- self, request: ChatCompletionRequest,
- result_generator: AsyncIterator[RequestOutput], request_id: str
- ) -> Union[ErrorResponse, AsyncGenerator[str, None]]:
- model_name = request.model
- created_time = int(time.monotonic())
- chunk_object_type = "chat.completion.chunk"
- first_iteration = True
- # Send response for each token for each request.n (index)
- previous_texts = [""] * request.n
- previous_num_tokens = [0] * request.n
- finish_reason_sent = [False] * request.n
- try:
- async for res in result_generator:
- res: RequestOutput
- # We need to do it here, because if there are exceptions in
- # the result_generator, it needs to be sent as the FIRST
- # response (by the try...catch).
- if first_iteration:
- # Send first response for each request.n (index) with
- # the role
- role = self.get_chat_request_role(request)
- for i in range(request.n):
- choice_data = ChatCompletionResponseStreamChoice(
- index=i,
- delta=DeltaMessage(role=role),
- logprobs=None,
- finish_reason=None)
- chunk = ChatCompletionStreamResponse(
- id=request_id,
- object=chunk_object_type,
- created=created_time,
- choices=[choice_data],
- model=model_name)
- data = chunk.model_dump_json(exclude_unset=True)
- yield f"data: {data}\n\n"
- # Send response to echo the input portion of the last
- # message
- if request.echo:
- last_msg_content = ""
- if request.messages and isinstance(
- request.messages,
- list) and request.messages[-1].get(
- "content") and request.messages[-1].get(
- "role") == role:
- last_msg_content = request.messages[-1]["content"]
- if last_msg_content:
- for i in range(request.n):
- choice_data = ChatCompletionResponseStreamChoice( # noqa
- index=i,
- delta=DeltaMessage(
- content=last_msg_content),
- finish_reason=None)
- chunk = ChatCompletionStreamResponse(
- id=request_id,
- object=chunk_object_type,
- created=created_time,
- choices=[choice_data],
- logprobs=None,
- model=model_name)
- data = chunk.model_dump_json(
- exclude_unset=True)
- yield f"data: {data}\n\n"
- first_iteration = False
- for output in res.outputs:
- i = output.index
- if finish_reason_sent[i]:
- continue
- delta_token_ids = output.token_ids[previous_num_tokens[i]:]
- top_logprobs = output.logprobs[
- previous_num_tokens[i]:] if output.logprobs else None
- if request.logprobs:
- logprobs = self._create_logprobs(
- token_ids=delta_token_ids,
- top_logprobs=top_logprobs,
- num_output_top_logprobs=request.logprobs,
- initial_text_offset=len(previous_texts[i]),
- )
- else:
- logprobs = None
- delta_text = output.text[len(previous_texts[i]):]
- previous_texts[i] = output.text
- previous_num_tokens[i] = len(output.token_ids)
- if output.finish_reason is None:
- # Send token-by-token response for each request.n
- choice_data = ChatCompletionResponseStreamChoice(
- index=i,
- delta=DeltaMessage(content=delta_text),
- logprobs=logprobs,
- finish_reason=None)
- chunk = ChatCompletionStreamResponse(
- id=request_id,
- object=chunk_object_type,
- created=created_time,
- choices=[choice_data],
- model=model_name)
- data = chunk.model_dump_json(exclude_unset=True)
- yield f"data: {data}\n\n"
- else:
- # Send the finish response for each request.n only once
- prompt_tokens = len(res.prompt_token_ids)
- final_usage = UsageInfo(
- prompt_tokens=prompt_tokens,
- completion_tokens=previous_num_tokens[i],
- total_tokens=prompt_tokens +
- previous_num_tokens[i],
- )
- choice_data = ChatCompletionResponseStreamChoice(
- index=i,
- delta=DeltaMessage(content=delta_text),
- logprobs=logprobs,
- finish_reason=output.finish_reason)
- chunk = ChatCompletionStreamResponse(
- id=request_id,
- object=chunk_object_type,
- created=created_time,
- choices=[choice_data],
- model=model_name)
- if final_usage is not None:
- chunk.usage = final_usage
- data = chunk.model_dump_json(exclude_unset=True,
- exclude_none=True)
- yield f"data: {data}\n\n"
- finish_reason_sent[i] = True
- except ValueError as e:
- # TODO: Use an aphrodite-specific Validation Error
- data = self.create_streaming_error_response(str(e))
- yield f"data: {data}\n\n"
- # Send the final done message after all response.n are finished
- yield "data: [DONE]\n\n"
- async def chat_completion_full_generator(
- self, request: ChatCompletionRequest, raw_request: Request,
- result_generator: AsyncIterator[RequestOutput],
- request_id: str) -> Union[ErrorResponse, ChatCompletionResponse]:
- model_name = request.model
- created_time = int(time.monotonic())
- final_res: RequestOutput = None
- async for res in result_generator:
- if await raw_request.is_disconnected():
- # Abort the request if the client disconnects.
- await self.engine.abort(request_id)
- return self.create_error_response("Client disconnected")
- final_res = res
- assert final_res is not None
- choices = []
- role = self.get_chat_request_role(request)
- for output in final_res.outputs:
- token_ids = output.token_ids
- top_logprobs = output.logprobs
- if request.logprobs:
- logprobs = self._create_logprobs(
- token_ids=token_ids,
- top_logprobs=top_logprobs,
- num_output_top_logprobs=request.logprobs,
- )
- else:
- logprobs = None
- choice_data = ChatCompletionResponseChoice(
- index=output.index,
- message=ChatMessage(role=role, content=output.text),
- logprobs=logprobs,
- finish_reason=output.finish_reason,
- )
- choices.append(choice_data)
- if request.echo:
- last_msg_content = ""
- if request.messages and isinstance(
- request.messages, list) and request.messages[-1].get(
- "content") and request.messages[-1].get(
- "role") == role:
- last_msg_content = request.messages[-1]["content"]
- for choice in choices:
- full_message = last_msg_content + choice.message.content
- choice.message.content = full_message
- num_prompt_tokens = len(final_res.prompt_token_ids)
- num_generated_tokens = sum(
- len(output.token_ids) for output in final_res.outputs)
- usage = UsageInfo(
- prompt_tokens=num_prompt_tokens,
- completion_tokens=num_generated_tokens,
- total_tokens=num_prompt_tokens + num_generated_tokens,
- )
- response = ChatCompletionResponse(
- id=request_id,
- created=created_time,
- model=model_name,
- choices=choices,
- usage=usage,
- )
- return response
- def _load_chat_template(self, chat_template):
- if chat_template is not None:
- try:
- with open(chat_template, "r") as f:
- self.tokenizer.chat_template = f.read()
- except OSError:
- # If opening a file fails, set chat template to be args to
- # ensure we decode so our escape are interpreted correctly
- self.tokenizer.chat_template = codecs.decode(
- chat_template, "unicode_escape")
- logger.info("Using the supplied chat template.")
- elif self.tokenizer.chat_template is not None:
- logger.info("Using the default chat template")
- else:
- logger.warning(
- "No chat template provided. Chat API will not work.")
|