serving_completions.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. import asyncio
  2. import time
  3. from typing import (AsyncGenerator, AsyncIterator, Callable, Dict, List,
  4. Optional)
  5. from typing import Sequence as GenericSequence
  6. from typing import Tuple, Union, cast
  7. from fastapi import Request
  8. from aphrodite.common.config import ModelConfig
  9. from aphrodite.common.outputs import RequestOutput
  10. from aphrodite.common.sequence import Logprob
  11. from aphrodite.common.utils import merge_async_iterators, random_uuid
  12. from aphrodite.endpoints.logger import RequestLogger
  13. from aphrodite.endpoints.openai.protocol import (
  14. CompletionLogProbs, CompletionRequest, CompletionResponse,
  15. CompletionResponseChoice, CompletionResponseStreamChoice,
  16. CompletionStreamResponse, ErrorResponse, UsageInfo)
  17. from aphrodite.endpoints.openai.serving_engine import (BaseModelPath,
  18. LoRAModulePath,
  19. OpenAIServing,
  20. PromptAdapterPath)
  21. from aphrodite.engine.protocol import EngineClient
  22. from aphrodite.transformers_utils.tokenizer import AnyTokenizer
  23. TypeTokenIDs = List[int]
  24. TypeTopLogProbs = List[Optional[Dict[int, float]]]
  25. TypeCreateLogProbsFn = Callable[
  26. [TypeTokenIDs, TypeTopLogProbs, Optional[int], int], CompletionLogProbs]
  27. class OpenAIServingCompletion(OpenAIServing):
  28. def __init__(
  29. self,
  30. engine_client: EngineClient,
  31. model_config: ModelConfig,
  32. base_model_paths: List[BaseModelPath],
  33. *,
  34. lora_modules: Optional[List[LoRAModulePath]],
  35. prompt_adapters: Optional[List[PromptAdapterPath]],
  36. request_logger: Optional[RequestLogger],
  37. return_tokens_as_token_ids: bool = False,
  38. ):
  39. super().__init__(engine_client=engine_client,
  40. model_config=model_config,
  41. base_model_paths=base_model_paths,
  42. lora_modules=lora_modules,
  43. prompt_adapters=prompt_adapters,
  44. request_logger=request_logger,
  45. return_tokens_as_token_ids=return_tokens_as_token_ids)
  46. async def create_completion(
  47. self,
  48. request: CompletionRequest,
  49. raw_request: Request,
  50. ) -> Union[AsyncGenerator[str, None], CompletionResponse, ErrorResponse]:
  51. """Completion API similar to OpenAI's API.
  52. See https://platform.openai.com/docs/api-reference/completions/create
  53. for the API specification. This API mimics the OpenAI Completion API.
  54. NOTE: Currently we do not support the following feature:
  55. - suffix (the language models we currently support do not support
  56. suffix)
  57. """
  58. error_check_ret = await self._check_model(request)
  59. if error_check_ret is not None:
  60. return error_check_ret
  61. # If the engine is dead, raise the engine's DEAD_ERROR.
  62. # This is required for the streaming case, where we return a
  63. # success status before we actually start generating text :).
  64. if self.engine_client.errored:
  65. raise self.engine_client.dead_error
  66. # Return error for unsupported features.
  67. if request.suffix is not None:
  68. return self.create_error_response(
  69. "suffix is not currently supported")
  70. model_name = self.base_model_paths[0].name
  71. request_id = f"cmpl-{random_uuid()}"
  72. created_time = int(time.time())
  73. # Schedule the request and get the result generator.
  74. generators: List[AsyncGenerator[RequestOutput, None]] = []
  75. try:
  76. (
  77. lora_request,
  78. prompt_adapter_request,
  79. ) = self._maybe_get_adapters(request)
  80. tokenizer = await self.engine_client.get_tokenizer(lora_request)
  81. guided_decode_logits_processor = (
  82. await self._guided_decode_logits_processor(request, tokenizer))
  83. prompts = list(
  84. self._tokenize_prompt_input_or_inputs(
  85. request,
  86. tokenizer,
  87. request.prompt,
  88. truncate_prompt_tokens=request.truncate_prompt_tokens,
  89. add_special_tokens=request.add_special_tokens,
  90. ))
  91. for i, prompt_inputs in enumerate(prompts):
  92. sampling_params = request.to_sampling_params(
  93. tokenizer,
  94. guided_decode_logits_processor,
  95. default_max_tokens=self.max_model_len -
  96. len(prompt_inputs["prompt_token_ids"]))
  97. request_id_item = f"{request_id}-{i}"
  98. self._log_inputs(request_id_item,
  99. prompt_inputs,
  100. params=sampling_params,
  101. lora_request=lora_request,
  102. prompt_adapter_request=prompt_adapter_request)
  103. generator = self.engine_client.generate(
  104. {"prompt_token_ids": prompt_inputs["prompt_token_ids"]},
  105. sampling_params,
  106. request_id_item,
  107. lora_request=lora_request,
  108. prompt_adapter_request=prompt_adapter_request,
  109. )
  110. generators.append(generator)
  111. except ValueError as e:
  112. # TODO: Use an aphrodite-specific Validation Error
  113. return self.create_error_response(str(e))
  114. result_generator = merge_async_iterators(
  115. *generators, is_cancelled=raw_request.is_disconnected)
  116. # Similar to the OpenAI API, when n != best_of, we do not stream the
  117. # results. In addition, we do not stream the results when use
  118. # beam search.
  119. stream = (request.stream
  120. and (request.best_of is None or request.n == request.best_of)
  121. and not request.use_beam_search)
  122. # Streaming response
  123. if stream:
  124. return self.completion_stream_generator(request,
  125. result_generator,
  126. request_id,
  127. created_time,
  128. model_name,
  129. num_prompts=len(prompts),
  130. tokenizer=tokenizer)
  131. # Non-streaming response
  132. final_res_batch: List[Optional[RequestOutput]] = [None] * len(prompts)
  133. try:
  134. async for i, res in result_generator:
  135. final_res_batch[i] = res
  136. for i, final_res in enumerate(final_res_batch):
  137. assert final_res is not None
  138. # The output should contain the input text
  139. # We did not pass it into Aphrodite engine to avoid being
  140. # redundant with the inputs token IDs
  141. if final_res.prompt is None:
  142. final_res.prompt = prompts[i]["prompt"]
  143. final_res_batch_checked = cast(List[RequestOutput],
  144. final_res_batch)
  145. response = self.request_output_to_completion_response(
  146. final_res_batch_checked,
  147. request,
  148. request_id,
  149. created_time,
  150. model_name,
  151. tokenizer,
  152. )
  153. except asyncio.CancelledError:
  154. return self.create_error_response("Client disconnected")
  155. except ValueError as e:
  156. # TODO: Use an aphrodite-specific Validation Error
  157. return self.create_error_response(str(e))
  158. # When user requests streaming but we don't stream, we still need to
  159. # return a streaming response with a single event.
  160. if request.stream:
  161. response_json = response.model_dump_json()
  162. async def fake_stream_generator() -> AsyncGenerator[str, None]:
  163. yield f"data: {response_json}\n\n"
  164. yield "data: [DONE]\n\n"
  165. return fake_stream_generator()
  166. return response
  167. async def completion_stream_generator(
  168. self,
  169. request: CompletionRequest,
  170. result_generator: AsyncIterator[Tuple[int, RequestOutput]],
  171. request_id: str,
  172. created_time: int,
  173. model_name: str,
  174. num_prompts: int,
  175. tokenizer: AnyTokenizer,
  176. ) -> AsyncGenerator[str, None]:
  177. num_choices = 1 if request.n is None else request.n
  178. previous_text_lens = [0] * num_choices * num_prompts
  179. previous_num_tokens = [0] * num_choices * num_prompts
  180. has_echoed = [False] * num_choices * num_prompts
  181. num_prompt_tokens = [0] * num_prompts
  182. try:
  183. async for prompt_idx, res in result_generator:
  184. prompt_token_ids = res.prompt_token_ids
  185. prompt_logprobs = res.prompt_logprobs
  186. prompt_text = res.prompt
  187. # Prompt details are excluded from later streamed outputs
  188. if res.prompt_token_ids is not None:
  189. num_prompt_tokens[prompt_idx] = len(res.prompt_token_ids)
  190. delta_token_ids: GenericSequence[int]
  191. out_logprobs: Optional[GenericSequence[Optional[Dict[
  192. int, Logprob]]]]
  193. for output in res.outputs:
  194. i = output.index + prompt_idx * num_choices
  195. # TODO: optimize the performance by avoiding full
  196. # text O(n^2) sending.
  197. assert request.max_tokens is not None
  198. if request.echo and request.max_tokens == 0:
  199. assert prompt_token_ids is not None
  200. assert prompt_text is not None
  201. # only return the prompt
  202. delta_text = prompt_text
  203. delta_token_ids = prompt_token_ids
  204. out_logprobs = prompt_logprobs
  205. has_echoed[i] = True
  206. elif (request.echo and request.max_tokens > 0
  207. and not has_echoed[i]):
  208. assert prompt_token_ids is not None
  209. assert prompt_text is not None
  210. assert prompt_logprobs is not None
  211. # echo the prompt and first token
  212. delta_text = prompt_text + output.text
  213. delta_token_ids = [
  214. *prompt_token_ids, *output.token_ids
  215. ]
  216. out_logprobs = [
  217. *prompt_logprobs,
  218. *(output.logprobs or []),
  219. ]
  220. has_echoed[i] = True
  221. else:
  222. # return just the delta
  223. delta_text = output.text
  224. delta_token_ids = output.token_ids
  225. out_logprobs = output.logprobs
  226. if request.logprobs is not None:
  227. assert out_logprobs is not None, (
  228. "Did not output logprobs")
  229. logprobs = self._create_completion_logprobs(
  230. token_ids=delta_token_ids,
  231. top_logprobs=out_logprobs,
  232. num_output_top_logprobs=request.logprobs,
  233. tokenizer=tokenizer,
  234. initial_text_offset=previous_text_lens[i],
  235. )
  236. else:
  237. logprobs = None
  238. previous_text_lens[i] += len(output.text)
  239. previous_num_tokens[i] += len(output.token_ids)
  240. finish_reason = output.finish_reason
  241. stop_reason = output.stop_reason
  242. chunk = CompletionStreamResponse(
  243. id=request_id,
  244. created=created_time,
  245. model=model_name,
  246. choices=[
  247. CompletionResponseStreamChoice(
  248. index=i,
  249. text=delta_text,
  250. logprobs=logprobs,
  251. finish_reason=finish_reason,
  252. stop_reason=stop_reason,
  253. )
  254. ])
  255. if (request.stream_options
  256. and request.stream_options.include_usage):
  257. if (request.stream_options.continuous_usage_stats
  258. or output.finish_reason is not None):
  259. prompt_tokens = num_prompt_tokens[prompt_idx]
  260. completion_tokens = previous_num_tokens[i]
  261. usage = UsageInfo(
  262. prompt_tokens=prompt_tokens,
  263. completion_tokens=completion_tokens,
  264. total_tokens=prompt_tokens + completion_tokens,
  265. )
  266. if request.stream_options.continuous_usage_stats:
  267. chunk.usage = usage
  268. else:
  269. chunk.usage = None
  270. response_json = chunk.model_dump_json(exclude_unset=False)
  271. yield f"data: {response_json}\n\n"
  272. if (request.stream_options
  273. and request.stream_options.include_usage):
  274. final_usage_chunk = CompletionStreamResponse(
  275. id=request_id,
  276. created=created_time,
  277. model=model_name,
  278. choices=[],
  279. usage=usage,
  280. )
  281. final_usage_data = (final_usage_chunk.model_dump_json(
  282. exclude_unset=False, exclude_none=True))
  283. yield f"data: {final_usage_data}\n\n"
  284. except ValueError as e:
  285. # TODO: Use an aphrodite-specific Validation Error
  286. data = self.create_streaming_error_response(str(e))
  287. yield f"data: {data}\n\n"
  288. yield "data: [DONE]\n\n"
  289. def request_output_to_completion_response(
  290. self,
  291. final_res_batch: List[RequestOutput],
  292. request: CompletionRequest,
  293. request_id: str,
  294. created_time: int,
  295. model_name: str,
  296. tokenizer: AnyTokenizer,
  297. ) -> CompletionResponse:
  298. choices: List[CompletionResponseChoice] = []
  299. num_prompt_tokens = 0
  300. num_generated_tokens = 0
  301. for final_res in final_res_batch:
  302. prompt_token_ids = final_res.prompt_token_ids
  303. assert prompt_token_ids is not None
  304. prompt_logprobs = final_res.prompt_logprobs
  305. prompt_text = final_res.prompt
  306. token_ids: GenericSequence[int]
  307. out_logprobs: Optional[GenericSequence[Optional[Dict[int,
  308. Logprob]]]]
  309. for output in final_res.outputs:
  310. assert request.max_tokens is not None
  311. if request.echo and request.max_tokens == 0:
  312. assert prompt_text is not None
  313. token_ids = prompt_token_ids
  314. out_logprobs = prompt_logprobs
  315. output_text = prompt_text
  316. elif request.echo and request.max_tokens > 0:
  317. assert prompt_text is not None
  318. token_ids = [*prompt_token_ids, *output.token_ids]
  319. if request.logprobs is None:
  320. out_logprobs = None
  321. else:
  322. assert prompt_logprobs is not None
  323. assert output.logprobs is not None
  324. out_logprobs = [
  325. *prompt_logprobs,
  326. *output.logprobs,
  327. ]
  328. output_text = prompt_text + output.text
  329. else:
  330. token_ids = output.token_ids
  331. out_logprobs = output.logprobs
  332. output_text = output.text
  333. if request.logprobs is not None:
  334. assert out_logprobs is not None, "Did not output logprobs"
  335. logprobs = self._create_completion_logprobs(
  336. token_ids=token_ids,
  337. top_logprobs=out_logprobs,
  338. tokenizer=tokenizer,
  339. num_output_top_logprobs=request.logprobs,
  340. )
  341. else:
  342. logprobs = None
  343. choice_data = CompletionResponseChoice(
  344. index=len(choices),
  345. text=output_text,
  346. logprobs=logprobs,
  347. finish_reason=output.finish_reason,
  348. stop_reason=output.stop_reason,
  349. prompt_logprobs=final_res.prompt_logprobs,
  350. )
  351. choices.append(choice_data)
  352. num_generated_tokens += len(output.token_ids)
  353. num_prompt_tokens += len(prompt_token_ids)
  354. usage = UsageInfo(
  355. prompt_tokens=num_prompt_tokens,
  356. completion_tokens=num_generated_tokens,
  357. total_tokens=num_prompt_tokens + num_generated_tokens,
  358. )
  359. return CompletionResponse(
  360. id=request_id,
  361. created=created_time,
  362. model=model_name,
  363. choices=choices,
  364. usage=usage,
  365. )
  366. def _create_completion_logprobs(
  367. self,
  368. token_ids: GenericSequence[int],
  369. top_logprobs: GenericSequence[Optional[Dict[int, Logprob]]],
  370. num_output_top_logprobs: int,
  371. tokenizer: AnyTokenizer,
  372. initial_text_offset: int = 0,
  373. ) -> CompletionLogProbs:
  374. """Create logprobs for OpenAI Completion API."""
  375. out_text_offset: List[int] = []
  376. out_token_logprobs: List[Optional[float]] = []
  377. out_tokens: List[str] = []
  378. out_top_logprobs: List[Optional[Dict[str, float]]] = []
  379. last_token_len = 0
  380. for i, token_id in enumerate(token_ids):
  381. step_top_logprobs = top_logprobs[i]
  382. if step_top_logprobs is None:
  383. token = tokenizer.decode(token_id)
  384. if self.return_tokens_as_token_ids:
  385. token = f"token_id:{token_id}"
  386. out_tokens.append(token)
  387. out_token_logprobs.append(None)
  388. out_top_logprobs.append(None)
  389. else:
  390. step_token = step_top_logprobs[token_id]
  391. token = self._get_decoded_token(
  392. step_token,
  393. token_id,
  394. tokenizer,
  395. return_as_token_id=self.return_tokens_as_token_ids,
  396. )
  397. token_logprob = max(step_token.logprob, -9999.0)
  398. out_tokens.append(token)
  399. out_token_logprobs.append(token_logprob)
  400. # makes sure to add the top num_output_top_logprobs + 1
  401. # logprobs, as defined in the openai API
  402. # (cf. https://github.com/openai/openai-openapi/blob/
  403. # 893ba52242dbd5387a97b96444ee1c742cfce9bd/openapi.yaml#L7153)
  404. out_top_logprobs.append({
  405. # Convert float("-inf") to the
  406. # JSON-serializable float that OpenAI uses
  407. self._get_decoded_token(
  408. top_lp[1],
  409. top_lp[0],
  410. tokenizer,
  411. return_as_token_id=self.return_tokens_as_token_ids):
  412. max(top_lp[1].logprob, -9999.0)
  413. for i, top_lp in enumerate(step_top_logprobs.items())
  414. if num_output_top_logprobs >= i
  415. })
  416. if len(out_text_offset) == 0:
  417. out_text_offset.append(initial_text_offset)
  418. else:
  419. out_text_offset.append(out_text_offset[-1] + last_token_len)
  420. last_token_len = len(token)
  421. return CompletionLogProbs(
  422. text_offset=out_text_offset,
  423. token_logprobs=out_token_logprobs,
  424. tokens=out_tokens,
  425. top_logprobs=out_top_logprobs,
  426. )