123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- import argparse
- import asyncio
- import json
- import random
- import time
- from typing import AsyncGenerator, List, Tuple
- import aiohttp
- import numpy as np
- from transformers import PreTrainedTokenizerBase
- from aphrodite.transformers_utils.tokenizer import get_tokenizer
- # (prompt len, output len, latency)
- REQUEST_LATENCY: List[Tuple[int, int, float]] = []
- def sample_requests(
- dataset_path: str,
- num_requests: int,
- tokenizer: PreTrainedTokenizerBase,
- ) -> List[Tuple[str, int, int]]:
- # Load the dataset.
- with open(dataset_path) as f:
- dataset = json.load(f)
- # Filter out the conversations with less than 2 turns.
- dataset = [data for data in dataset if len(data["conversations"]) >= 2]
- # Only keep the first two turns of each conversation.
- dataset = [(data["conversations"][0]["value"],
- data["conversations"][1]["value"]) for data in dataset]
- # Tokenize the prompts and completions.
- prompts = [prompt for prompt, _ in dataset]
- prompt_token_ids = tokenizer(prompts).input_ids
- completions = [completion for _, completion in dataset]
- completion_token_ids = tokenizer(completions).input_ids
- tokenized_dataset = []
- for i in range(len(dataset)):
- output_len = len(completion_token_ids[i])
- tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len))
- # Filter out too long sequences.
- filtered_dataset: List[Tuple[str, int, int]] = []
- for prompt, prompt_token_ids, output_len in tokenized_dataset:
- prompt_len = len(prompt_token_ids)
- if prompt_len < 4 or output_len < 4:
- # Prune too short sequences.
- # This is because TGI causes errors when the input or output length
- # is too short.
- continue
- if prompt_len > 1024 or prompt_len + output_len > 2048:
- # Prune too long sequences.
- continue
- filtered_dataset.append((prompt, prompt_len, output_len))
- # Sample the requests.
- sampled_requests = random.sample(filtered_dataset, num_requests)
- return sampled_requests
- async def get_request(
- input_requests: List[Tuple[str, int, int]],
- request_rate: float,
- ) -> AsyncGenerator[Tuple[str, int, int], None]:
- input_requests = iter(input_requests)
- for request in input_requests:
- yield request
- if request_rate == float("inf"):
- # If the request rate is infinity, then we don't need to wait.
- continue
- # Sample the request interval from the exponential distribution.
- interval = np.random.exponential(1.0 / request_rate)
- # The next request will be sent after the interval.
- await asyncio.sleep(interval)
- async def send_request(
- backend: str,
- api_url: str,
- prompt: str,
- prompt_len: int,
- output_len: int,
- best_of: int,
- use_beam_search: bool,
- ) -> None:
- request_start_time = time.perf_counter()
- headers = {"User-Agent": "Benchmark Client"}
- if backend == "aphrodite":
- pload = {
- "prompt": prompt,
- "n": 1,
- "best_of": best_of,
- "use_beam_search": use_beam_search,
- "temperature": 0.0 if use_beam_search else 1.0,
- "top_p": 1.0,
- "max_tokens": output_len,
- "ignore_eos": True,
- "stream": False,
- }
- elif backend == "tgi":
- assert not use_beam_search
- params = {
- "best_of": best_of,
- "max_new_tokens": output_len,
- "do_sample": True,
- }
- pload = {
- "inputs": prompt,
- "parameters": params,
- }
- else:
- raise ValueError(f"Unknown backend: {backend}")
- timeout = aiohttp.ClientTimeout(total=3 * 3600)
- async with aiohttp.ClientSession(timeout=timeout) as session:
- while True:
- async with session.post(api_url, headers=headers,
- json=pload) as response:
- chunks = []
- async for chunk, _ in response.content.iter_chunks():
- chunks.append(chunk)
- output = b"".join(chunks).decode("utf-8")
- output = json.loads(output)
- # Re-send the request if it failed.
- if "error" not in output:
- break
- request_end_time = time.perf_counter()
- request_latency = request_end_time - request_start_time
- REQUEST_LATENCY.append((prompt_len, output_len, request_latency))
- async def benchmark(
- backend: str,
- api_url: str,
- input_requests: List[Tuple[str, int, int]],
- best_of: int,
- use_beam_search: bool,
- request_rate: float,
- ) -> None:
- tasks: List[asyncio.Task] = []
- async for request in get_request(input_requests, request_rate):
- prompt, prompt_len, output_len = request
- task = asyncio.create_task(
- send_request(backend, api_url, prompt, prompt_len, output_len,
- best_of, use_beam_search))
- tasks.append(task)
- await asyncio.gather(*tasks)
- def main(args: argparse.Namespace): # pylint: disable=redefined-outer-name
- print(args)
- random.seed(args.seed)
- np.random.seed(args.seed)
- api_url = f"http://{args.host}:{args.port}/api/v1/generate"
- tokenizer = get_tokenizer(args.tokenizer,
- trust_remote_code=args.trust_remote_code)
- input_requests = sample_requests(args.dataset, args.num_prompts, tokenizer)
- benchmark_start_time = time.perf_counter()
- asyncio.run(
- benchmark(args.backend, api_url, input_requests, args.best_of,
- args.use_beam_search, args.request_rate))
- benchmark_end_time = time.perf_counter()
- benchmark_time = benchmark_end_time - benchmark_start_time
- print(f"Total time: {benchmark_time:.2f} s")
- print(f"Throughput: {args.num_prompts / benchmark_time:.2f} requests/s")
- # Compute the latency statistics.
- avg_latency = np.mean([latency for _, _, latency in REQUEST_LATENCY])
- print(f"Average latency: {avg_latency:.2f} s")
- avg_per_token_latency = np.mean([
- latency / (prompt_len + output_len)
- for prompt_len, output_len, latency in REQUEST_LATENCY
- ])
- print(f"Average latency per token: {avg_per_token_latency:.2f} s")
- avg_per_output_token_latency = np.mean(
- [latency / output_len for _, output_len, latency in REQUEST_LATENCY])
- print("Average latency per output token: "
- f"{avg_per_output_token_latency:.2f} s")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(
- description="Benchmark the online serving throughput.")
- parser.add_argument("--backend",
- type=str,
- default="aphrodite",
- choices=["aphrodite", "tgi"])
- parser.add_argument("--host", type=str, default="localhost")
- parser.add_argument("--port", type=int, default=2242)
- parser.add_argument("--dataset",
- type=str,
- required=True,
- help="Path to the dataset.")
- parser.add_argument("--tokenizer",
- type=str,
- required=True,
- help="Name or path of the tokenizer.")
- parser.add_argument("--best-of",
- type=int,
- default=1,
- help="Generates `best_of` sequences per prompt and "
- "returns the best one.")
- parser.add_argument("--use-beam-search", action="store_true")
- parser.add_argument("--num-prompts",
- type=int,
- default=1000,
- help="Number of prompts to process.")
- parser.add_argument("--request-rate",
- type=float,
- default=float("inf"),
- help="Number of requests per second. If this is inf, "
- "then all the requests are sent at time 0. "
- "Otherwise, we use Poisson process to synthesize "
- "the request arrival times.")
- parser.add_argument("--seed", type=int, default=0)
- parser.add_argument("--trust-remote-code",
- action="store_true",
- help="trust remote code from huggingface")
- args = parser.parse_args()
- main(args)
|