serving_completions.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. import asyncio
  2. import time
  3. from fastapi import Request
  4. from typing import AsyncGenerator, AsyncIterator, Callable, List, Optional, Dict, Tuple
  5. from aphrodite.common.utils import random_uuid
  6. from aphrodite.engine.async_aphrodite import AsyncAphrodite
  7. from aphrodite.endpoints.openai.protocol import (
  8. CompletionRequest,
  9. CompletionResponse,
  10. CompletionResponseChoice,
  11. CompletionResponseStreamChoice,
  12. CompletionStreamResponse,
  13. LogProbs,
  14. UsageInfo,
  15. )
  16. from aphrodite.common.outputs import RequestOutput
  17. from aphrodite.endpoints.openai.serving_engine import OpenAIServing, LoRA
  18. from aphrodite.modeling.outlines_decoding import get_guided_decoding_logits_processor
  19. TypeTokenIDs = List[int]
  20. TypeTopLogProbs = List[Optional[Dict[int, float]]]
  21. TypeCreateLogProbsFn = Callable[
  22. [TypeTokenIDs, TypeTopLogProbs, Optional[int], int], LogProbs]
  23. def parse_prompt_format(prompt) -> Tuple[bool, list]:
  24. # get the prompt, openai supports the following
  25. # "a string, array of strings, array of tokens, or array of token arrays."
  26. prompt_is_tokens = False
  27. prompts = [prompt] # case 1: a string
  28. if isinstance(prompt, list):
  29. if len(prompt) == 0:
  30. raise ValueError("please provide at least one prompt")
  31. elif isinstance(prompt[0], str):
  32. prompt_is_tokens = False
  33. prompts = prompt # case 2: array of strings
  34. elif isinstance(prompt[0], int):
  35. prompt_is_tokens = True
  36. prompts = [prompt] # case 3: array of tokens
  37. elif isinstance(prompt[0], list) and isinstance(prompt[0][0], int):
  38. prompt_is_tokens = True
  39. prompts = prompt # case 4: array of token arrays
  40. else:
  41. raise ValueError(
  42. "prompt must be a string, array of strings, array of tokens, or array of token arrays"
  43. )
  44. return prompt_is_tokens, prompts
  45. def merge_async_iterators(*iterators):
  46. """Merge multiple asynchronous iterators into a single iterator.
  47. This method handle the case where some iterators finish before others.
  48. When it yields, it yields a tuple (i, item) where i is the index of the
  49. iterator that yields the item.
  50. """
  51. queue = asyncio.Queue()
  52. finished = [False] * len(iterators)
  53. async def producer(i, iterator):
  54. try:
  55. async for item in iterator:
  56. await queue.put((i, item))
  57. except Exception as e:
  58. await queue.put(e)
  59. finished[i] = True
  60. _tasks = [
  61. asyncio.create_task(producer(i, iterator))
  62. for i, iterator in enumerate(iterators)
  63. ]
  64. async def consumer():
  65. while not all(finished) or not queue.empty():
  66. item = await queue.get()
  67. if isinstance(item, Exception):
  68. raise item
  69. yield item
  70. await asyncio.gather(*_tasks)
  71. return consumer()
  72. class OpenAIServingCompletion(OpenAIServing):
  73. def __init__(self,
  74. engine: AsyncAphrodite,
  75. served_model: str,
  76. lora_modules: Optional[List[LoRA]] = None):
  77. super().__init__(engine=engine,
  78. served_model=served_model,
  79. lora_modules=lora_modules)
  80. async def create_completion(self, request: CompletionRequest,
  81. raw_request: Request):
  82. """Completion API similar to OpenAI's API.
  83. See https://platform.openai.com/docs/api-reference/completions/create
  84. for the API specification. This API mimics the OpenAI Completion API.
  85. NOTE: Currently we do not support the following feature:
  86. - suffix (the language models we currently support do not support
  87. suffix)
  88. """
  89. error_check_ret = await self._check_model(request)
  90. if error_check_ret is not None:
  91. return error_check_ret
  92. # Return error for unsupported features.
  93. if request.suffix is not None:
  94. return self.create_error_response(
  95. "suffix is not currently supported")
  96. model_name = request.model
  97. request_id = f"cmpl-{random_uuid()}"
  98. created_time = int(time.monotonic())
  99. # Schedule the request and get the result generator.
  100. generators = []
  101. try:
  102. sampling_params = request.to_sampling_params()
  103. lora_request = self._maybe_get_lora(request)
  104. guided_decode_logit_processor = (
  105. await get_guided_decoding_logits_processor(
  106. request, self.engine.get_tokenizer()))
  107. if guided_decode_logit_processor is not None:
  108. if sampling_params.logits_processors is None:
  109. sampling_params.logits_processors = []
  110. sampling_params.logits_processors.append(
  111. guided_decode_logit_processor)
  112. prompt_is_tokens, prompts = parse_prompt_format(request.prompt)
  113. for i, prompt in enumerate(prompts):
  114. if prompt_is_tokens:
  115. input_ids = self._validate_prompt_and_tokenize(
  116. request, prompt_ids=prompt)
  117. else:
  118. input_ids = self._validate_prompt_and_tokenize(
  119. request, prompt=prompt)
  120. generators.append(
  121. self.engine.generate(prompt,
  122. sampling_params,
  123. f"{request_id}-{i}",
  124. prompt_token_ids=input_ids,
  125. lora_request=lora_request))
  126. except ValueError as e:
  127. return self.create_error_response(str(e))
  128. result_generator: AsyncIterator[Tuple[
  129. int, RequestOutput]] = merge_async_iterators(*generators)
  130. # Similar to the OpenAI API, when n != best_of, we do not stream the
  131. # results. In addition, we do not stream the results when use beam search.
  132. stream = (request.stream
  133. and (request.best_of is None or request.n == request.best_of)
  134. and not request.use_beam_search)
  135. # Streaming response
  136. if stream:
  137. return self.completion_stream_generator(request,
  138. raw_request,
  139. result_generator,
  140. request_id,
  141. created_time,
  142. model_name,
  143. num_prompts=len(prompts))
  144. # Non-streaming response
  145. final_res_batch: RequestOutput = [None] * len(prompts)
  146. try:
  147. async for i, res in result_generator:
  148. if await raw_request.is_disconnected():
  149. # Abort the request if the client disconnects.
  150. await self.engine.abort(f"{request_id}-{i}")
  151. return self.create_error_response("Client disconnected")
  152. final_res_batch[i] = res
  153. response = self.request_output_to_completion_response(
  154. final_res_batch, request, request_id, created_time, model_name)
  155. except ValueError as e:
  156. # TODO: Use a aphrodite-specific Validation Error
  157. return self.create_error_response(str(e))
  158. # When user requests streaming but we don't stream, we still need to
  159. # return a streaming response with a single event.
  160. if request.stream:
  161. response_json = response.model_dump_json()
  162. async def fake_stream_generator() -> AsyncGenerator[str, None]:
  163. yield f"data: {response_json}\n\n"
  164. yield "data: [DONE]\n\n"
  165. return fake_stream_generator()
  166. return response
  167. async def completion_stream_generator(
  168. self,
  169. request: CompletionRequest,
  170. raw_request: Request,
  171. result_generator: AsyncIterator[Tuple[int, RequestOutput]],
  172. request_id: str,
  173. created_time: int,
  174. model_name: str,
  175. num_prompts: int,
  176. ) -> AsyncGenerator[str, None]:
  177. previous_texts = [""] * request.n * num_prompts
  178. previous_num_tokens = [0] * request.n * num_prompts
  179. has_echoed = [False] * request.n * num_prompts
  180. try:
  181. async for prompt_idx, res in result_generator:
  182. # Abort the request if the client disconnects.
  183. if await raw_request.is_disconnected():
  184. await self.engine.abort(f"{request_id}-{prompt_idx}")
  185. raise StopAsyncIteration()
  186. for output in res.outputs:
  187. i = output.index + prompt_idx * request.n
  188. # TODO: optimize the performance by avoiding full text O(n^2) sending.
  189. if request.echo and request.max_tokens == 0:
  190. # only return the prompt
  191. delta_text = res.prompt
  192. delta_token_ids = res.prompt_token_ids
  193. top_logprobs = res.prompt_logprobs
  194. has_echoed[i] = True
  195. elif request.echo and request.max_tokens > 0 and not has_echoed[
  196. i]:
  197. # echo the prompt and first token
  198. delta_text = res.prompt + output.text
  199. delta_token_ids = res.prompt_token_ids + output.token_ids
  200. top_logprobs = res.prompt_logprobs + (output.logprobs
  201. or [])
  202. has_echoed[i] = True
  203. else:
  204. # return just the delta
  205. delta_text = output.text[len(previous_texts[i]):]
  206. delta_token_ids = output.token_ids[
  207. previous_num_tokens[i]:]
  208. top_logprobs = output.logprobs[previous_num_tokens[
  209. i]:] if output.logprobs else None
  210. if request.logprobs is not None:
  211. assert top_logprobs is not None, "top_logprobs must be provided when logprobs is requested"
  212. logprobs = self._create_logprobs(
  213. token_ids=delta_token_ids,
  214. top_logprobs=top_logprobs,
  215. num_output_top_logprobs=request.logprobs,
  216. initial_text_offset=len(previous_texts[i]),
  217. )
  218. else:
  219. logprobs = None
  220. previous_texts[i] = output.text
  221. previous_num_tokens[i] = len(output.token_ids)
  222. finish_reason = output.finish_reason
  223. response_json = CompletionStreamResponse(
  224. id=request_id,
  225. created=created_time,
  226. model=model_name,
  227. choices=[
  228. CompletionResponseStreamChoice(
  229. index=i,
  230. text=delta_text,
  231. logprobs=logprobs,
  232. finish_reason=finish_reason,
  233. )
  234. ]).model_dump_json()
  235. yield f"data: {response_json}\n\n"
  236. if output.finish_reason is not None: # return final usage
  237. logprobs = LogProbs(
  238. ) if request.logprobs is not None else None
  239. prompt_tokens = len(res.prompt_token_ids)
  240. completion_tokens = len(output.token_ids)
  241. final_usage = UsageInfo(
  242. prompt_tokens=prompt_tokens,
  243. completion_tokens=completion_tokens,
  244. total_tokens=prompt_tokens + completion_tokens,
  245. )
  246. response_json = CompletionStreamResponse(
  247. id=request_id,
  248. created=created_time,
  249. model=model_name,
  250. choices=[
  251. CompletionResponseStreamChoice(
  252. index=i,
  253. text="",
  254. logprobs=logprobs,
  255. finish_reason=output.finish_reason,
  256. )
  257. ],
  258. usage=final_usage,
  259. ).model_dump_json()
  260. yield f"data: {response_json}\n\n"
  261. except ValueError as e:
  262. # TODO: Use an aphrodite-specific Validation Error
  263. data = self.create_streaming_error_response(str(e))
  264. yield f"data: {data}\n\n"
  265. yield "data: [DONE]\n\n"
  266. def request_output_to_completion_response(
  267. self,
  268. final_res_batch: List[RequestOutput],
  269. request: CompletionRequest,
  270. request_id: str,
  271. created_time: int,
  272. model_name: str,
  273. ) -> CompletionResponse:
  274. choices = []
  275. num_prompt_tokens = 0
  276. num_generated_tokens = 0
  277. for final_res in final_res_batch:
  278. assert final_res is not None
  279. prompt_token_ids = final_res.prompt_token_ids
  280. prompt_logprobs = final_res.prompt_logprobs
  281. prompt_text = final_res.prompt
  282. for output in final_res.outputs:
  283. if request.echo and request.max_tokens == 0:
  284. token_ids = prompt_token_ids
  285. top_logprobs = prompt_logprobs
  286. output_text = prompt_text
  287. elif request.echo and request.max_tokens > 0:
  288. token_ids = prompt_token_ids + output.token_ids
  289. top_logprobs = prompt_logprobs + output.logprobs
  290. output_text = prompt_text + output.text
  291. else:
  292. token_ids = output.token_ids
  293. top_logprobs = output.logprobs
  294. output_text = output.text
  295. if request.logprobs is not None:
  296. logprobs = self._create_logprobs(
  297. token_ids=token_ids,
  298. top_logprobs=top_logprobs,
  299. num_output_top_logprobs=request.logprobs,
  300. )
  301. else:
  302. logprobs = None
  303. choice_data = CompletionResponseChoice(
  304. index=len(choices),
  305. text=output_text,
  306. logprobs=logprobs,
  307. finish_reason=output.finish_reason,
  308. )
  309. choices.append(choice_data)
  310. num_prompt_tokens += len(prompt_token_ids)
  311. num_generated_tokens += sum(
  312. len(output.token_ids) for output in final_res.outputs)
  313. usage = UsageInfo(
  314. prompt_tokens=num_prompt_tokens,
  315. completion_tokens=num_generated_tokens,
  316. total_tokens=num_prompt_tokens + num_generated_tokens,
  317. )
  318. return CompletionResponse(
  319. id=request_id,
  320. created=created_time,
  321. model=model_name,
  322. choices=choices,
  323. usage=usage,
  324. )