123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 |
- import functools
- import os
- import signal
- import subprocess
- import sys
- import time
- import warnings
- from contextlib import contextmanager
- from pathlib import Path
- from typing import Any, Callable, Dict, List, Optional
- import openai
- import requests
- from openai.types.completion import Completion
- from transformers import AutoTokenizer
- from typing_extensions import ParamSpec
- from aphrodite.common.utils import (FlexibleArgumentParser, get_open_port,
- is_hip)
- from aphrodite.distributed import (ensure_model_parallel_initialized,
- init_distributed_environment)
- from aphrodite.endpoints.openai.args import make_arg_parser
- from aphrodite.engine.args_tools import AsyncEngineArgs
- from aphrodite.modeling.model_loader.loader import DefaultModelLoader
- from aphrodite.platforms import current_platform
- from tests.models.utils import TextTextLogprobs
- if current_platform.is_rocm():
- from amdsmi import (amdsmi_get_gpu_vram_usage,
- amdsmi_get_processor_handles, amdsmi_init,
- amdsmi_shut_down)
- @contextmanager
- def _nvml():
- try:
- amdsmi_init()
- yield
- finally:
- amdsmi_shut_down()
- elif current_platform.is_cuda():
- from pynvml import (nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo,
- nvmlInit, nvmlShutdown)
- @contextmanager
- def _nvml():
- try:
- nvmlInit()
- yield
- finally:
- nvmlShutdown()
- else:
- @contextmanager
- def _nvml():
- yield
- APHRODITE_PATH = Path(__file__).parent.parent
- """Path to root of the Aphrodite repository."""
- class RemoteOpenAIServer:
- DUMMY_API_KEY = "token-abc123"
- def __init__(self,
- model: str,
- aphrodite_serve_args: List[str],
- *,
- env_dict: Optional[Dict[str, str]] = None,
- auto_port: bool = True,
- max_wait_seconds: Optional[float] = None) -> None:
- if auto_port:
- if "-p" in aphrodite_serve_args or "--port" in aphrodite_serve_args:
- raise ValueError("You have manually specified the port "
- "when `auto_port=True`.")
- # Don't mutate the input args
- aphrodite_serve_args = aphrodite_serve_args + [
- "--port", str(get_open_port())
- ]
- parser = FlexibleArgumentParser(
- description="Aphrodite's remote OpenAI server.")
- parser = make_arg_parser(parser)
- args = parser.parse_args(["--model", model, *aphrodite_serve_args])
- self.host = str(args.host or 'localhost')
- self.port = int(args.port)
- # download the model before starting the server to avoid timeout
- is_local = os.path.isdir(model)
- if not is_local:
- engine_args = AsyncEngineArgs.from_cli_args(args)
- engine_config = engine_args.create_engine_config()
- dummy_loader = DefaultModelLoader(engine_config.load_config)
- dummy_loader._prepare_weights(engine_config.model_config.model,
- engine_config.model_config.revision,
- fall_back_to_pt=True)
- env = os.environ.copy()
- # the current process might initialize cuda,
- # to be safe, we should use spawn method
- env['APHRODITE_WORKER_MULTIPROC_METHOD'] = 'spawn'
- if env_dict is not None:
- env.update(env_dict)
- self.proc = subprocess.Popen(
- ["aphrodite", "run", model, *aphrodite_serve_args],
- env=env,
- stdout=sys.stdout,
- stderr=sys.stderr,
- )
- max_wait_seconds = max_wait_seconds or 240
- self._wait_for_server(url=self.url_for("health"),
- timeout=max_wait_seconds)
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- self.proc.terminate()
- try:
- self.proc.wait(3)
- except subprocess.TimeoutExpired:
- # force kill if needed
- self.proc.kill()
- def _wait_for_server(self, *, url: str, timeout: float):
- # run health check
- start = time.time()
- while True:
- try:
- if requests.get(url).status_code == 200:
- break
- except Exception as err:
- result = self.proc.poll()
- if result is not None and result != 0:
- raise RuntimeError("Server exited unexpectedly.") from err
- time.sleep(0.5)
- if time.time() - start > timeout:
- raise RuntimeError(
- "Server failed to start in time.") from err
- @property
- def url_root(self) -> str:
- return f"http://{self.host}:{self.port}"
- def url_for(self, *parts: str) -> str:
- return self.url_root + "/" + "/".join(parts)
- def get_client(self):
- return openai.OpenAI(
- base_url=self.url_for("v1"),
- api_key=self.DUMMY_API_KEY,
- )
- def get_async_client(self):
- return openai.AsyncOpenAI(
- base_url=self.url_for("v1"),
- api_key=self.DUMMY_API_KEY,
- max_retries=0,
- )
- def compare_two_settings(model: str,
- arg1: List[str],
- arg2: List[str],
- env1: Optional[Dict[str, str]] = None,
- env2: Optional[Dict[str, str]] = None,
- max_wait_seconds: Optional[float] = None) -> None:
- """
- Launch API server with two different sets of arguments/environments
- and compare the results of the API calls.
- Args:
- model: The model to test.
- arg1: The first set of arguments to pass to the API server.
- arg2: The second set of arguments to pass to the API server.
- env1: The first set of environment variables to pass to the API server.
- env2: The second set of environment variables to pass to the API server.
- """
- trust_remote_code = "--trust-remote-code"
- if trust_remote_code in arg1 or trust_remote_code in arg2:
- tokenizer = AutoTokenizer.from_pretrained(model,
- trust_remote_code=True)
- else:
- tokenizer = AutoTokenizer.from_pretrained(model)
- prompt = "Hello, my name is"
- token_ids = tokenizer(prompt)["input_ids"]
- results = []
- for args, env in ((arg1, env1), (arg2, env2)):
- with RemoteOpenAIServer(model,
- args,
- env_dict=env,
- max_wait_seconds=max_wait_seconds) as server:
- client = server.get_client()
- # test models list
- models = client.models.list()
- models = models.data
- served_model = models[0]
- results.append({
- "test": "models_list",
- "id": served_model.id,
- "root": served_model.root,
- })
- # test with text prompt
- completion = client.completions.create(model=model,
- prompt=prompt,
- max_tokens=5,
- temperature=0.0)
- results.append({
- "test": "single_completion",
- "text": completion.choices[0].text,
- "finish_reason": completion.choices[0].finish_reason,
- "usage": completion.usage,
- })
- # test using token IDs
- completion = client.completions.create(
- model=model,
- prompt=token_ids,
- max_tokens=5,
- temperature=0.0,
- )
- results.append({
- "test": "token_ids",
- "text": completion.choices[0].text,
- "finish_reason": completion.choices[0].finish_reason,
- "usage": completion.usage,
- })
- # test seeded random sampling
- completion = client.completions.create(model=model,
- prompt=prompt,
- max_tokens=5,
- seed=33,
- temperature=1.0)
- results.append({
- "test": "seeded_sampling",
- "text": completion.choices[0].text,
- "finish_reason": completion.choices[0].finish_reason,
- "usage": completion.usage,
- })
- # test seeded random sampling with multiple prompts
- completion = client.completions.create(model=model,
- prompt=[prompt, prompt],
- max_tokens=5,
- seed=33,
- temperature=1.0)
- results.append({
- "test":
- "seeded_sampling",
- "text": [choice.text for choice in completion.choices],
- "finish_reason":
- [choice.finish_reason for choice in completion.choices],
- "usage":
- completion.usage,
- })
- # test simple list
- batch = client.completions.create(
- model=model,
- prompt=[prompt, prompt],
- max_tokens=5,
- temperature=0.0,
- )
- results.append({
- "test": "simple_list",
- "text0": batch.choices[0].text,
- "text1": batch.choices[1].text,
- })
- # test streaming
- batch = client.completions.create(
- model=model,
- prompt=[prompt, prompt],
- max_tokens=5,
- temperature=0.0,
- stream=True,
- )
- texts = [""] * 2
- for chunk in batch:
- assert len(chunk.choices) == 1
- choice = chunk.choices[0]
- texts[choice.index] += choice.text
- results.append({
- "test": "streaming",
- "texts": texts,
- })
- n = len(results) // 2
- arg1_results = results[:n]
- arg2_results = results[n:]
- for arg1_result, arg2_result in zip(arg1_results, arg2_results):
- assert arg1_result == arg2_result, (
- f"Results for {model=} are not the same with {arg1=} and {arg2=}. "
- f"{arg1_result=} != {arg2_result=}")
- def init_test_distributed_environment(
- tp_size: int,
- pp_size: int,
- rank: int,
- distributed_init_port: str,
- local_rank: int = -1,
- ) -> None:
- distributed_init_method = f"tcp://localhost:{distributed_init_port}"
- init_distributed_environment(
- world_size=pp_size * tp_size,
- rank=rank,
- distributed_init_method=distributed_init_method,
- local_rank=local_rank)
- ensure_model_parallel_initialized(tp_size, pp_size)
- def multi_process_parallel(
- tp_size: int,
- pp_size: int,
- test_target: Any,
- ) -> None:
- import ray
- # Using ray helps debugging the error when it failed
- # as compared to multiprocessing.
- # NOTE: We need to set working_dir for distributed tests,
- # otherwise we may get import errors on ray workers
- ray.init(runtime_env={"working_dir": APHRODITE_PATH})
- distributed_init_port = get_open_port()
- refs = []
- for rank in range(tp_size * pp_size):
- refs.append(
- test_target.remote(tp_size, pp_size, rank, distributed_init_port))
- ray.get(refs)
- ray.shutdown()
- @contextmanager
- def error_on_warning():
- """
- Within the scope of this context manager, tests will fail if any warning
- is emitted.
- """
- with warnings.catch_warnings():
- warnings.simplefilter("error")
- yield
- @_nvml()
- def wait_for_gpu_memory_to_clear(devices: List[int],
- threshold_bytes: int,
- timeout_s: float = 120) -> None:
- # Use nvml instead of pytorch to reduce measurement error from torch cuda
- # context.
- start_time = time.time()
- while True:
- output: Dict[int, str] = {}
- output_raw: Dict[int, float] = {}
- for device in devices:
- if is_hip():
- dev_handle = amdsmi_get_processor_handles()[device]
- mem_info = amdsmi_get_gpu_vram_usage(dev_handle)
- gb_used = mem_info["vram_used"] / 2**10
- else:
- dev_handle = nvmlDeviceGetHandleByIndex(device)
- mem_info = nvmlDeviceGetMemoryInfo(dev_handle)
- gb_used = mem_info.used / 2**30
- output_raw[device] = gb_used
- output[device] = f'{gb_used:.02f}'
- print('gpu memory used (GB): ', end='')
- for k, v in output.items():
- print(f'{k}={v}; ', end='')
- print('')
- dur_s = time.time() - start_time
- if all(v <= (threshold_bytes / 2**30) for v in output_raw.values()):
- print(f'Done waiting for free GPU memory on devices {devices=} '
- f'({threshold_bytes/2**30=}) {dur_s=:.02f}')
- break
- if dur_s >= timeout_s:
- raise ValueError(f'Memory of devices {devices=} not free after '
- f'{dur_s=:.02f} ({threshold_bytes/2**30=})')
- time.sleep(5)
- _P = ParamSpec("_P")
- def fork_new_process_for_each_test(
- f: Callable[_P, None]) -> Callable[_P, None]:
- """Decorator to fork a new process for each test function.
- """
- @functools.wraps(f)
- def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> None:
- # Make the process the leader of its own process group
- # to avoid sending SIGTERM to the parent process
- os.setpgrp()
- from _pytest.outcomes import Skipped
- pid = os.fork()
- print(f"Fork a new process to run a test {pid}")
- if pid == 0:
- try:
- f(*args, **kwargs)
- except Skipped as e:
- # convert Skipped to exit code 0
- print(str(e))
- os._exit(0)
- except Exception:
- import traceback
- traceback.print_exc()
- os._exit(1)
- else:
- os._exit(0)
- else:
- pgid = os.getpgid(pid)
- _pid, _exitcode = os.waitpid(pid, 0)
- # ignore SIGTERM signal itself
- old_signal_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
- # kill all child processes
- os.killpg(pgid, signal.SIGTERM)
- # restore the signal handler
- signal.signal(signal.SIGTERM, old_signal_handler)
- assert _exitcode == 0, (f"function {f} failed when called with"
- f" args {args} and kwargs {kwargs}")
- return wrapper
- async def completions_with_server_args(
- prompts: List[str],
- model_name: str,
- server_cli_args: List[str],
- num_logprobs: Optional[int],
- max_wait_seconds: int = 240,
- ) -> Completion:
- '''Construct a remote OpenAI server, obtain an async client to the
- server & invoke the completions API to obtain completions.
- Args:
- prompts: test prompts
- model_name: model to spin up on the Aphrodite server
- server_cli_args: CLI args for starting the server
- num_logprobs: Number of logprobs to report (or `None`)
- max_wait_seconds: timeout interval for bringing up server.
- Default: 240sec
- Returns:
- OpenAI Completion instance
- '''
- outputs = None
- with RemoteOpenAIServer(model_name,
- server_cli_args,
- max_wait_seconds=max_wait_seconds) as server:
- client = server.get_async_client()
- outputs = await client.completions.create(model=model_name,
- prompt=prompts,
- temperature=0,
- stream=False,
- max_tokens=5,
- logprobs=num_logprobs)
- assert outputs is not None
- return outputs
- def get_client_text_generations(completions: Completion) -> List[str]:
- '''Extract generated tokens from the output of a
- request made to an Open-AI-protocol completions endpoint.
- '''
- return [x.text for x in completions.choices]
- def get_client_text_logprob_generations(
- completions: Completion) -> List[TextTextLogprobs]:
- '''Operates on the output of a request made to an Open-AI-protocol
- completions endpoint; obtains top-rank logprobs for each token in
- each :class:`SequenceGroup`
- '''
- text_generations = get_client_text_generations(completions)
- text = ''.join(text_generations)
- return [(text_generations, text,
- (None if x.logprobs is None else x.logprobs.top_logprobs))
- for x in completions.choices]
|