test_async_aphrodite.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import asyncio
  2. import os
  3. import uuid
  4. from asyncio import CancelledError
  5. from copy import copy
  6. from dataclasses import dataclass
  7. from typing import List, Optional
  8. import pytest
  9. import pytest_asyncio
  10. import torch
  11. from aphrodite import SamplingParams
  12. from aphrodite.common.config import ParallelConfig
  13. from aphrodite.common.outputs import RequestOutput as RealRequestOutput
  14. from aphrodite.common.sampling_params import RequestOutputKind
  15. from aphrodite.engine.async_aphrodite import AsyncAphrodite, AsyncEngineArgs
  16. from ..conftest import cleanup
  17. from ..utils import wait_for_gpu_memory_to_clear
  18. @dataclass
  19. class RequestOutput:
  20. request_id: int
  21. finished: bool = False
  22. class MockEngine:
  23. def __init__(self):
  24. self.step_calls = 0
  25. self.add_request_calls = 0
  26. self.abort_request_calls = 0
  27. self.request_id = None
  28. # Ugly, remove dependency when possible
  29. self.parallel_config = ParallelConfig(1, 1, False)
  30. async def step_async(self, virtual_engine):
  31. # PP size is 1, ignore virtual engine
  32. self.step_calls += 1
  33. return [RequestOutput(
  34. request_id=self.request_id)] if self.request_id else []
  35. async def process_model_inputs_async(self, *args, **kwargs):
  36. pass
  37. async def stop_remote_worker_execution_loop_async(self):
  38. pass
  39. def generate(self, request_id):
  40. self.request_id = request_id
  41. def stop_generating(self):
  42. self.request_id = None
  43. def add_request(self, **kwargs):
  44. del kwargs # Unused
  45. self.add_request_calls += 1
  46. print(f'Request calls: {self.add_request_calls}')
  47. async def add_request_async(self, **kwargs):
  48. self.add_request_calls += 1
  49. return
  50. def abort_request(self, request_id):
  51. del request_id # Unused
  52. self.abort_request_calls += 1
  53. def has_unfinished_requests(self):
  54. return self.request_id is not None
  55. def has_unfinished_requests_for_virtual_engine(self, virtual_engine):
  56. return self.request_id is not None
  57. class MockAsyncAphrodite(AsyncAphrodite):
  58. _engine_class = MockEngine
  59. @pytest.mark.asyncio
  60. async def test_new_requests_event():
  61. engine = MockAsyncAphrodite(worker_use_ray=False)
  62. engine.start_background_loop()
  63. await asyncio.sleep(0.01)
  64. assert engine.engine.step_calls == 0
  65. await engine.add_request("1", "", None)
  66. await asyncio.sleep(0.01)
  67. assert engine.engine.add_request_calls == 1
  68. assert engine.engine.step_calls == 1
  69. await engine.add_request("2", "", None)
  70. engine.engine.generate("2")
  71. await asyncio.sleep(0)
  72. await asyncio.sleep(0)
  73. await asyncio.sleep(0)
  74. assert engine.engine.add_request_calls == 2
  75. assert engine.engine.step_calls >= 2
  76. await asyncio.sleep(0.001)
  77. assert engine.engine.step_calls >= 3
  78. engine.engine.stop_generating()
  79. await asyncio.sleep(0.001)
  80. old_step_calls = engine.engine.step_calls
  81. await asyncio.sleep(0.001)
  82. assert engine.engine.step_calls == old_step_calls
  83. await engine.add_request("3", "", None)
  84. await asyncio.sleep(0.01)
  85. assert engine.engine.add_request_calls == 3
  86. assert engine.engine.step_calls == old_step_calls + 1
  87. await asyncio.sleep(0.01)
  88. assert engine.engine.add_request_calls == 3
  89. assert engine.engine.step_calls == old_step_calls + 1
  90. engine = MockAsyncAphrodite(worker_use_ray=True)
  91. assert engine.get_model_config() is not None
  92. assert engine.get_tokenizer() is not None
  93. assert engine.get_decoding_config() is not None
  94. def start_engine():
  95. wait_for_gpu_memory_to_clear(
  96. devices=list(range(torch.cuda.device_count())),
  97. threshold_bytes=2 * 2**30,
  98. timeout_s=60,
  99. )
  100. num_scheduler_steps = int(os.getenv("NUM_SCHEDULER_STEPS", "1"))
  101. print(f"Starting engine with num_scheduler_steps={num_scheduler_steps}")
  102. return AsyncAphrodite.from_engine_args(
  103. AsyncEngineArgs(model="facebook/opt-125m",
  104. enforce_eager=True,
  105. num_scheduler_steps=num_scheduler_steps))
  106. def uid() -> str:
  107. return str(uuid.uuid4())
  108. @pytest_asyncio.fixture(scope="module")
  109. async def async_engine():
  110. engine = await asyncio.get_event_loop().run_in_executor(executor=None,
  111. func=start_engine)
  112. try:
  113. yield engine
  114. finally:
  115. engine.shutdown_background_loop()
  116. del engine
  117. await asyncio.sleep(0.1)
  118. cleanup()
  119. @pytest.fixture()
  120. def should_do_global_cleanup_after_test(request) -> bool:
  121. # So we can share the async engine fixture between these tests
  122. return False
  123. @pytest.mark.asyncio(scope="module")
  124. @pytest.mark.parametrize("stop", [None, ["a stop string"]])
  125. async def test_asyncio_run(async_engine, stop):
  126. scheduler_config = await async_engine.get_scheduler_config()
  127. num_scheduler_steps = scheduler_config.num_scheduler_steps
  128. async def run(prompt: str):
  129. sampling_params = SamplingParams(
  130. temperature=0,
  131. max_tokens=32,
  132. min_tokens=32,
  133. stop=stop,
  134. )
  135. output_count = 0
  136. final_output = None
  137. async for output in async_engine.generate(prompt,
  138. sampling_params,
  139. request_id=uid()):
  140. output_count += 1
  141. final_output = output
  142. return final_output, output_count
  143. results = await asyncio.gather(
  144. run("test0"),
  145. run("test0"),
  146. )
  147. assert len(results) == 2
  148. first, second = results
  149. # remove nondeterministic fields for comparison
  150. first[0].metrics = None
  151. second[0].metrics = None
  152. first[0].request_id = None
  153. second[0].request_id = None
  154. assert str(first) == str(second)
  155. output_count = results[0][1]
  156. if num_scheduler_steps == 1:
  157. assert output_count == 32
  158. else:
  159. assert 1 < output_count < 32
  160. @pytest.mark.asyncio(scope="module")
  161. @pytest.mark.parametrize("stop", [None, ["a stop string"]])
  162. async def test_output_kinds(async_engine, stop):
  163. """Test that output_kind works as expected and that
  164. results are equivalent across different kinds."""
  165. scheduler_config = await async_engine.get_scheduler_config()
  166. num_scheduler_steps = scheduler_config.num_scheduler_steps
  167. sampling_params = SamplingParams(
  168. temperature=0,
  169. max_tokens=32,
  170. min_tokens=32,
  171. stop=stop,
  172. )
  173. async def run(prompt: str, kind: RequestOutputKind):
  174. params = copy(sampling_params)
  175. params.output_kind = kind
  176. output_count = 0
  177. final_output = None
  178. async for output in async_engine.generate(prompt,
  179. params,
  180. request_id=uid()):
  181. output_count += 1
  182. final_output = output
  183. assert final_output is not None
  184. assert final_output.finished
  185. return (final_output.prompt_token_ids,
  186. final_output.outputs[0].token_ids,
  187. final_output.outputs[0].text, output_count)
  188. async def run_deltas(prompt: str):
  189. params = copy(sampling_params)
  190. params.output_kind = RequestOutputKind.DELTA
  191. prompt_tokens = None
  192. output_tokens: List[int] = []
  193. output_text = ""
  194. output_count = 0
  195. final_output = 0
  196. async for output in async_engine.generate(prompt,
  197. params,
  198. request_id=uid()):
  199. token_ids = output.outputs[0].token_ids
  200. text = output.outputs[0].text
  201. final_output = output
  202. # Ensure we get prompt ids iff we haven't yet received output tokens
  203. if output_tokens:
  204. assert 1 <= len(token_ids) <= num_scheduler_steps
  205. assert stop or text
  206. assert not output.prompt_token_ids
  207. else:
  208. assert output.prompt_token_ids
  209. prompt_tokens = output.prompt_token_ids
  210. output_tokens.extend(token_ids)
  211. output_text += text
  212. output_count += 1
  213. assert final_output is not None
  214. assert final_output.finished
  215. return prompt_tokens, output_tokens, output_text, output_count
  216. results = await asyncio.gather(
  217. run("common input prompt", RequestOutputKind.CUMULATIVE),
  218. run("common input prompt", RequestOutputKind.FINAL_ONLY),
  219. run_deltas("common input prompt"))
  220. # Make sure outputs are the same
  221. prompt_set = set(tuple(prompt_ids) for prompt_ids, _, _, _ in results)
  222. assert len(prompt_set) == 1
  223. text_set = set(text for _, _, text, _ in results)
  224. assert len(text_set) == 1
  225. tokens_set = set(tuple(ids) for _, ids, _, _ in results)
  226. assert len(tokens_set) == 1
  227. cumulative, final, deltas = results
  228. # output message counts
  229. assert cumulative[3] == deltas[3]
  230. if num_scheduler_steps == 1:
  231. assert cumulative[3] == 32
  232. else:
  233. assert 1 < cumulative[3] < 32
  234. assert final[3] == 1
  235. @pytest.mark.asyncio(scope="module")
  236. @pytest.mark.parametrize("stop", [None, ["a stop string"]])
  237. async def test_cancellation(async_engine, stop):
  238. scheduler_config = await async_engine.get_scheduler_config()
  239. num_scheduler_steps = scheduler_config.num_scheduler_steps
  240. sampling_params = SamplingParams(
  241. temperature=0,
  242. min_tokens=13,
  243. max_tokens=13,
  244. stop=stop,
  245. )
  246. stop_at = 5 if num_scheduler_steps == 1 else 1
  247. request_id = uid()
  248. i = 0
  249. with pytest.raises(CancelledError):
  250. async for output in async_engine.generate("test2",
  251. sampling_params,
  252. request_id=request_id):
  253. assert not output.finished
  254. i += 1
  255. if i == stop_at:
  256. await async_engine.abort(request_id)
  257. assert i == stop_at
  258. @pytest.mark.asyncio(scope="module")
  259. @pytest.mark.parametrize("stop", [None, ["a stop string"]])
  260. async def test_delayed_generator(async_engine, stop):
  261. scheduler_config = await async_engine.get_scheduler_config()
  262. if scheduler_config.num_scheduler_steps != 1:
  263. pytest.skip("no need to test this one with multistep")
  264. sampling_params = SamplingParams(
  265. temperature=0,
  266. min_tokens=10,
  267. max_tokens=10,
  268. stop=stop,
  269. )
  270. stream = async_engine.generate("test3", sampling_params, request_id=uid())
  271. i = 0
  272. final_output: Optional[RealRequestOutput] = None
  273. async for output in stream:
  274. final_output = output
  275. if i == 0:
  276. # wait for generation to complete before consuming
  277. # the remaining messages
  278. await asyncio.sleep(1)
  279. if i < 9:
  280. assert not output.finished
  281. i += 1
  282. assert i == 10
  283. assert final_output is not None
  284. assert len(final_output.outputs[0].token_ids) == 10
  285. assert final_output.finished