1
0

serving.py 12 KB


  1. """Benchmark online serving throughput.
  2. On the server side, run one of the following commands:
  3. (Aphrodite backend)
  4. python -m aphrodite.endpoints.openai.api_server \
  5. --model <your_model> --swap-space 16 \
  6. --disable-log-requests
  7. (TGI backend)
  8. ./launch_tgi_server.sh <your_model> <max_batch_total_tokens>
  9. On the client side, run:
  10. python tests/benchmarks/serving.py \
  11. --backend <backend> \
  12. --tokenizer <your_model> --dataset <target_dataset> \
  13. --request-rate <request_rate>
  14. """
  15. import argparse
  16. import asyncio
  17. import json
  18. import random
  19. import time
  20. from dataclasses import dataclass
  21. from datetime import datetime
  22. from typing import AsyncGenerator, List, Tuple
  23. import numpy as np
  24. from tqdm.asyncio import tqdm
  25. from transformers import PreTrainedTokenizerBase
  26. from aphrodite.transformers_utils.tokenizer import get_tokenizer
  27. from backend_request_func import (
  28. ASYNC_REQUEST_FUNCS,
  29. RequestFuncInput,
  30. RequestFuncOutput,
  31. )
  32. @dataclass
  33. class BenchmarkMetrics:
  34. completed: int
  35. total_input: int
  36. total_output: int
  37. request_throughput: float
  38. input_throughput: float
  39. output_throughput: float
  40. mean_ttft_ms: float
  41. median_ttft_ms: float
  42. p99_ttft_ms: float
  43. mean_tpot_ms: float
  44. median_tpot_ms: float
  45. p99_tpot_ms: float
  46. def sample_requests(
  47. dataset_path: str,
  48. num_requests: int,
  49. tokenizer: PreTrainedTokenizerBase,
  50. ) -> List[Tuple[str, int, int]]:
  51. # Load the dataset.
  52. with open(dataset_path) as f:
  53. dataset = json.load(f)
  54. # Filter out the conversations with less than 2 turns.
  55. dataset = [data for data in dataset if len(data["conversations"]) >= 2]
  56. # Only keep the first two turns of each conversation.
  57. dataset = [(data["conversations"][0]["value"],
  58. data["conversations"][1]["value"]) for data in dataset]
  59. # some of these will be filtered out, so sample more than we need
  60. sampled_indices = random.sample(range(len(dataset)),
  61. int(num_requests * 1.2))
  62. dataset = [dataset[i] for i in sampled_indices]
  63. # Tokenize the prompts and completions.
  64. prompts = [prompt for prompt, _ in dataset]
  65. prompt_token_ids = tokenizer(prompts).input_ids
  66. completions = [completion for _, completion in dataset]
  67. completion_token_ids = tokenizer(completions).input_ids
  68. tokenized_dataset = []
  69. for i in range(len(dataset)):
  70. output_len = len(completion_token_ids[i])
  71. tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len))
  72. # Filter out too long sequences.
  73. filtered_dataset: List[Tuple[str, int, int]] = []
  74. for prompt, prompt_token_ids, output_len in tokenized_dataset:
  75. prompt_len = len(prompt_token_ids)
  76. if prompt_len < 4 or output_len < 4:
  77. # Prune too short sequences.
  78. # This is because TGI causes errors when the input or output length
  79. # is too short.
  80. continue
  81. if prompt_len > 1024 or prompt_len + output_len > 2048:
  82. # Prune too long sequences.
  83. continue
  84. filtered_dataset.append((prompt, prompt_len, output_len))
  85. # Sample the requests.
  86. sampled_requests = random.sample(filtered_dataset, num_requests)
  87. return sampled_requests
  88. async def get_request(
  89. input_requests: List[Tuple[str, int, int]],
  90. request_rate: float,
  91. ) -> AsyncGenerator[Tuple[str, int, int], None]:
  92. input_requests = iter(input_requests)
  93. for request in input_requests:
  94. yield request
  95. if request_rate == float("inf"):
  96. # If the request rate is infinity, then we don't need to wait.
  97. continue
  98. # Sample the request interval from the exponential distribution.
  99. interval = np.random.exponential(1.0 / request_rate)
  100. # The next request will be sent after the interval.
  101. await asyncio.sleep(interval)
  102. def calculate_metrics(
  103. input_requests: List[Tuple[str, int, int]],
  104. outputs: List[RequestFuncOutput],
  105. dur_s: float,
  106. tokenizer: PreTrainedTokenizerBase,
  107. ) -> BenchmarkMetrics:
  108. total_output = 0
  109. total_input = 0
  110. completed = 0
  111. per_token_latencies = []
  112. ttfts = []
  113. for i in range(len(outputs)):
  114. if outputs[i].success:
  115. output_len = len(tokenizer.encode(outputs[i].generated_text))
  116. total_output += output_len
  117. total_input += input_requests[i][1]
  118. per_token_latencies.append(outputs[i].latency / output_len)
  119. ttfts.append(outputs[i].ttft)
  120. completed += 1
  121. metrics = BenchmarkMetrics(
  122. completed=completed,
  123. total_input=total_input,
  124. total_output=total_output,
  125. request_throughput=completed / dur_s,
  126. input_throughput=total_input / dur_s,
  127. output_throughput=total_output / dur_s,
  128. mean_ttft_ms=np.mean(ttfts) * 1000,
  129. median_ttft_ms=np.median(ttfts) * 1000,
  130. p99_ttft_ms=np.percentile(ttfts, 99) * 1000,
  131. mean_tpot_ms=np.mean(per_token_latencies) * 1000,
  132. median_tpot_ms=np.median(per_token_latencies) * 1000,
  133. p99_tpot_ms=np.percentile(per_token_latencies, 99) * 1000,
  134. )
  135. return metrics
  136. async def benchmark(
  137. backend: str,
  138. api_url: str,
  139. model_id: str,
  140. tokenizer: PreTrainedTokenizerBase,
  141. input_requests: List[Tuple[str, int, int]],
  142. best_of: int,
  143. use_beam_search: bool,
  144. request_rate: float,
  145. disable_tqdm: bool,
  146. ):
  147. if backend in ASYNC_REQUEST_FUNCS:
  148. request_func = ASYNC_REQUEST_FUNCS.get(backend)
  149. else:
  150. raise ValueError(f"Unknown backend: {backend}")
  151. pbar = None if disable_tqdm else tqdm(total=len(input_requests))
  152. print(f"Traffic request rate: {request_rate}")
  153. benchmark_start_time = time.perf_counter()
  154. tasks = []
  155. async for request in get_request(input_requests, request_rate):
  156. prompt, prompt_len, output_len = request
  157. request_func_input = RequestFuncInput(
  158. model=model_id,
  159. prompt=prompt,
  160. api_url=api_url,
  161. prompt_len=prompt_len,
  162. output_len=output_len,
  163. best_of=best_of,
  164. use_beam_search=use_beam_search,
  165. )
  166. tasks.append(
  167. asyncio.create_task(
  168. request_func(request_func_input=request_func_input,
  169. pbar=pbar)))
  170. outputs = await asyncio.gather(*tasks)
  171. if not disable_tqdm:
  172. pbar.close()
  173. benchmark_duration = time.perf_counter() - benchmark_start_time
  174. metrics = calculate_metrics(
  175. input_requests=input_requests,
  176. outputs=outputs,
  177. dur_s=benchmark_duration,
  178. tokenizer=tokenizer,
  179. )
  180. print(f"Successful requests: {metrics.completed}")
  181. print(f"Benchmark duration: {benchmark_duration:2f} s")
  182. print(f"Total input tokens: {metrics.total_input}")
  183. print(f"Total generated tokens: {metrics.total_output}")
  184. print(f"Request throughput: {metrics.request_throughput:.2f} requests/s")
  185. print(f"Input token throughput: {metrics.input_throughput:.2f} tokens/s")
  186. print(f"Output token throughput: {metrics.output_throughput:.2f} tokens/s")
  187. print(f"Mean TTFT: {metrics.mean_ttft_ms:.2f} ms")
  188. print(f"Median TTFT: {metrics.median_ttft_ms:.2f} ms")
  189. print(f"P99 TTFT: {metrics.p99_ttft_ms:.2f} ms")
  190. print(f"Mean TPOT: {metrics.mean_tpot_ms:.2f} ms")
  191. print(f"Median TPOT: {metrics.median_tpot_ms:.2f} ms")
  192. print(f"P99 TPOT: {metrics.p99_tpot_ms:.2f} ms")
  193. result = {
  194. "duration": benchmark_duration,
  195. "completed": metrics.completed,
  196. "total_input_tokens": metrics.total_input,
  197. "total_output_tokens": metrics.total_output,
  198. "request_inthroughput": metrics.request_throughput,
  199. "input_throughput": metrics.input_throughput,
  200. "output_throughput": metrics.output_throughput,
  201. "mean_ttft_ms": metrics.mean_ttft_ms,
  202. "median_ttft_ms": metrics.median_ttft_ms,
  203. "p99_ttft_ms": metrics.p99_ttft_ms,
  204. "mean_tpot_ms": metrics.mean_tpot_ms,
  205. "median_tpot_ms": metrics.median_tpot_ms,
  206. "p99_tpot_ms": metrics.p99_tpot_ms
  207. }
  208. return result
  209. def main(args: argparse.Namespace):
  210. print(args)
  211. random.seed(args.seed)
  212. np.random.seed(args.seed)
  213. backend = args.backend
  214. model_id = args.model
  215. tokenizer_id = args.tokenizer if args.tokenizer is not None else args.model
  216. if args.base_url is not None:
  217. api_url = f"{args.base_url}{args.endpoint}"
  218. else:
  219. api_url = f"http://{args.host}:{args.port}{args.endpoint}"
  220. tokenizer = get_tokenizer(tokenizer_id,
  221. trust_remote_code=args.trust_remote_code)
  222. input_requests = sample_requests(args.dataset, args.num_prompts, tokenizer)
  223. benchmark_result = asyncio.run(
  224. benchmark(
  225. backend=backend,
  226. api_url=api_url,
  227. model_id=model_id,
  228. tokenizer=tokenizer,
  229. input_requests=input_requests,
  230. best_of=args.best_of,
  231. use_beam_search=args.use_beam_search,
  232. request_rate=args.request_rate,
  233. disable_tqdm=args.disable_tqdm,
  234. ))
  235. # Save config and results to json
  236. if args.save_result:
  237. result_json = {}
  238. # Setup
  239. current_dt = datetime.now().strftime("%Y%m%d-%H%M%S")
  240. result_json["date"] = current_dt
  241. result_json["backend"] = backend
  242. result_json["version"] = args.version
  243. result_json["model_id"] = model_id
  244. result_json["tokenizer_id"] = tokenizer_id
  245. result_json["best_of"] = args.best_of
  246. result_json["use_beam_search"] = args.use_beam_search
  247. result_json["num_prompts"] = args.num_prompts
  248. # Traffic
  249. result_json["request_rate"] = (
  250. args.request_rate if args.request_rate < float("inf") else "inf")
  251. # Merge with benchmark result
  252. result_json = {**result_json, **benchmark_result}
  253. # Save to file
  254. base_model_id = model_id.split("/")[-1]
  255. file_name = f"{backend}-{args.request_rate}qps-{base_model_id}-"
  256. f"{current_dt}.json"
  257. with open(file_name, "w") as outfile:
  258. json.dump(result_json, outfile)
  259. if __name__ == "__main__":
  260. parser = argparse.ArgumentParser(
  261. description="Benchmark the online serving throughput.")
  262. parser.add_argument(
  263. "--backend",
  264. type=str,
  265. default="aphrodite",
  266. choices=list(ASYNC_REQUEST_FUNCS.keys()),
  267. )
  268. parser.add_argument(
  269. "--version",
  270. type=str,
  271. default="N/A",
  272. help="Version of the serving backend/engine.",
  273. )
  274. parser.add_argument(
  275. "--base-url",
  276. type=str,
  277. default=None,
  278. help="Server or API base url if not using http host and port.",
  279. )
  280. parser.add_argument("--host", type=str, default="localhost")
  281. parser.add_argument("--port", type=int, default=2242)
  282. parser.add_argument(
  283. "--endpoint",
  284. type=str,
  285. default="/v1/completions",
  286. help="API endpoint.",
  287. )
  288. parser.add_argument("--dataset",
  289. type=str,
  290. required=True,
  291. help="Path to the dataset.")
  292. parser.add_argument(
  293. "--model",
  294. type=str,
  295. required=True,
  296. help="Name of the model.",
  297. )
  298. parser.add_argument(
  299. "--tokenizer",
  300. type=str,
  301. help="Name or path of the tokenizer, if not using the default model "
  302. "tokenizer.",
  303. )
  304. parser.add_argument(
  305. "--best-of",
  306. type=int,
  307. default=1,
  308. help="Generates `best_of` sequences per prompt and "
  309. "returns the best one.",
  310. )
  311. parser.add_argument("--use-beam-search", action="store_true")
  312. parser.add_argument(
  313. "--num-prompts",
  314. type=int,
  315. default=1000,
  316. help="Number of prompts to process.",
  317. )
  318. parser.add_argument(
  319. "--request-rate",
  320. type=float,
  321. default=float("inf"),
  322. help="Number of requests per second. If this is inf, "
  323. "then all the requests are sent at time 0. "
  324. "Otherwise, we use Poisson process to synthesize "
  325. "the request arrival times.",
  326. )
  327. parser.add_argument("--seed", type=int, default=0)
  328. parser.add_argument(
  329. "--trust-remote-code",
  330. action="store_true",
  331. help="Trust remote code from huggingface",
  332. )
  333. parser.add_argument(
  334. "--disable-tqdm",
  335. action="store_true",
  336. help="Specify to disable tqdm progress bar.",
  337. )
  338. parser.add_argument(
  339. "--save-result",
  340. action="store_true",
  341. help="Specify to save benchmark results to a json file",
  342. )
  343. args = parser.parse_args()
  344. main(args)