test_tokenizer_group.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import asyncio
  2. import os
  3. import sys
  4. from typing import List, Optional
  5. from unittest.mock import patch
  6. import pytest
  7. from transformers import AutoTokenizer, PreTrainedTokenizerBase
  8. from aphrodite.transformers_utils.tokenizer_group import (TokenizerGroup,
  9. get_tokenizer_group)
  10. from aphrodite.transformers_utils.tokenizer_group.ray_tokenizer_group import (
  11. RayTokenizerGroupPool)
  12. from ..conftest import get_tokenizer_pool_config
  13. class CustomTokenizerGroup(TokenizerGroup):
  14. def __init__(self, *args, **kwargs):
  15. super().__init__(*args, **kwargs)
  16. self._i = 0
  17. def encode(self, *args, **kwargs):
  18. self._i += 1
  19. return super().encode(*args, **kwargs)
  20. @pytest.mark.asyncio
  21. @pytest.mark.parametrize("tokenizer_group_type",
  22. [None, "ray", CustomTokenizerGroup])
  23. async def test_tokenizer_group(tokenizer_group_type):
  24. reference_tokenizer = AutoTokenizer.from_pretrained("gpt2")
  25. tokenizer_group = get_tokenizer_group(
  26. get_tokenizer_pool_config(tokenizer_group_type),
  27. tokenizer_id="gpt2",
  28. enable_lora=False,
  29. max_num_seqs=1,
  30. max_input_length=None,
  31. )
  32. assert reference_tokenizer.encode("prompt") == tokenizer_group.encode(
  33. request_id="request_id", prompt="prompt", lora_request=None)
  34. assert reference_tokenizer.encode(
  35. "prompt") == await tokenizer_group.encode_async(
  36. request_id="request_id", prompt="prompt", lora_request=None)
  37. assert isinstance(tokenizer_group.get_lora_tokenizer(None),
  38. PreTrainedTokenizerBase)
  39. assert tokenizer_group.get_lora_tokenizer(
  40. None) == await tokenizer_group.get_lora_tokenizer_async(None)
  41. if tokenizer_group_type is CustomTokenizerGroup:
  42. assert tokenizer_group._i > 0
  43. @pytest.mark.asyncio
  44. @pytest.mark.parametrize("tokenizer_group_type", ["ray"])
  45. async def test_tokenizer_group_pool(tokenizer_group_type):
  46. reference_tokenizer = AutoTokenizer.from_pretrained("gpt2")
  47. tokenizer_group_pool = get_tokenizer_group(
  48. get_tokenizer_pool_config(tokenizer_group_type),
  49. tokenizer_id="gpt2",
  50. enable_lora=False,
  51. max_num_seqs=1,
  52. max_input_length=None,
  53. )
  54. # Send multiple requests to the tokenizer group pool
  55. # (more than the pool size)
  56. # and check that all requests are processed correctly.
  57. num_requests = tokenizer_group_pool.pool_size * 5
  58. requests = [
  59. tokenizer_group_pool.encode_async(request_id=str(i),
  60. prompt=f"prompt {i}",
  61. lora_request=None)
  62. for i in range(num_requests)
  63. ]
  64. results = await asyncio.gather(*requests)
  65. expected_results = [
  66. reference_tokenizer.encode(f"prompt {i}") for i in range(num_requests)
  67. ]
  68. assert results == expected_results
  69. @pytest.mark.asyncio
  70. @pytest.mark.parametrize("tokenizer_group_type", ["ray"])
  71. async def test_tokenizer_group_ray_pool_env_var_propagation(
  72. tokenizer_group_type):
  73. """Test that env vars from caller process are propagated to
  74. tokenizer Ray actors."""
  75. env_var = "MY_ENV_VAR"
  76. class EnvVarCheckerTokenizerGroup(TokenizerGroup):
  77. def ping(self):
  78. assert os.environ.get(env_var) == "1"
  79. return super().ping()
  80. class EnvVarCheckerRayTokenizerGroupPool(RayTokenizerGroupPool):
  81. _worker_cls = EnvVarCheckerTokenizerGroup
  82. tokenizer_pool_config = get_tokenizer_pool_config(tokenizer_group_type)
  83. tokenizer_pool = EnvVarCheckerRayTokenizerGroupPool.from_config(
  84. tokenizer_pool_config,
  85. tokenizer_id="gpt2",
  86. enable_lora=False,
  87. max_num_seqs=1,
  88. max_input_length=None)
  89. with pytest.raises(AssertionError):
  90. tokenizer_pool.ping()
  91. with patch.dict(os.environ, {env_var: "1"}):
  92. tokenizer_pool_config = get_tokenizer_pool_config(tokenizer_group_type)
  93. tokenizer_pool = EnvVarCheckerRayTokenizerGroupPool.from_config(
  94. tokenizer_pool_config,
  95. tokenizer_id="gpt2",
  96. enable_lora=False,
  97. max_num_seqs=1,
  98. max_input_length=None)
  99. tokenizer_pool.ping()
  100. @pytest.mark.asyncio
  101. @pytest.mark.parametrize("tokenizer_group_type", ["ray"])
  102. async def test_tokenizer_group_ray_pool_fault_tolerance(tokenizer_group_type):
  103. """Test that Ray tokenizer pool group can recover from failures and
  104. if that's not possible, mark itself as unhealthy."""
  105. class FailingTokenizerGroup(TokenizerGroup):
  106. def __init__(self,
  107. *args,
  108. fail_at: Optional[List[int]] = None,
  109. **kwargs):
  110. super().__init__(*args, **kwargs)
  111. self.i = 0
  112. self.fail_at = fail_at or []
  113. def encode(self, *args, **kwargs):
  114. self.i += 1
  115. if self.i in self.fail_at:
  116. sys.exit(1)
  117. return super().encode(*args, **kwargs)
  118. class FailingRayTokenizerGroupPool(RayTokenizerGroupPool):
  119. _worker_cls = FailingTokenizerGroup
  120. # Fail at first iteration
  121. fail_at = [1]
  122. tokenizer_pool_config = get_tokenizer_pool_config(tokenizer_group_type)
  123. tokenizer_group_pool = FailingRayTokenizerGroupPool.from_config(
  124. tokenizer_pool_config,
  125. tokenizer_id="gpt2",
  126. enable_lora=False,
  127. max_num_seqs=1,
  128. max_input_length=None,
  129. fail_at=fail_at)
  130. tokenizer_actors = tokenizer_group_pool.tokenizer_actors.copy()
  131. # Modify fail at to not fail at all (will be re-read when actor is
  132. # re-initialized).
  133. fail_at[0] = 1000
  134. # We should recover successfully.
  135. await tokenizer_group_pool.encode_async(request_id="1",
  136. prompt="prompt",
  137. lora_request=None)
  138. await tokenizer_group_pool.encode_async(request_id="1",
  139. prompt="prompt",
  140. lora_request=None)
  141. # Check that we have a new actor
  142. assert len(tokenizer_group_pool.tokenizer_actors) == len(tokenizer_actors)
  143. assert tokenizer_group_pool.tokenizer_actors != tokenizer_actors
  144. # Fail at first iteration
  145. fail_at = [1]
  146. tokenizer_group_pool = FailingRayTokenizerGroupPool.from_config(
  147. tokenizer_pool_config,
  148. tokenizer_id="gpt2",
  149. enable_lora=False,
  150. max_num_seqs=1,
  151. max_input_length=None,
  152. fail_at=fail_at)
  153. # We should fail after re-initialization.
  154. with pytest.raises(RuntimeError):
  155. await tokenizer_group_pool.encode_async(request_id="1",
  156. prompt="prompt",
  157. lora_request=None)
  158. # check_health should raise the same thing
  159. with pytest.raises(RuntimeError):
  160. tokenizer_group_pool.check_health()
  161. # Ensure that non-ActorDiedErrors are still propagated correctly and do not
  162. # cause a re-initialization.
  163. fail_at = []
  164. tokenizer_group_pool = FailingRayTokenizerGroupPool.from_config(
  165. tokenizer_pool_config,
  166. tokenizer_id="gpt2",
  167. enable_lora=False,
  168. max_num_seqs=1,
  169. max_input_length=2,
  170. fail_at=fail_at)
  171. tokenizer_actors = tokenizer_group_pool.tokenizer_actors.copy()
  172. # Prompt too long error
  173. with pytest.raises(ValueError):
  174. await tokenizer_group_pool.encode_async(request_id="1",
  175. prompt="prompt" * 100,
  176. lora_request=None)
  177. await tokenizer_group_pool.encode_async(request_id="1",
  178. prompt="prompt",
  179. lora_request=None)
  180. # Actors should stay the same.
  181. assert tokenizer_group_pool.tokenizer_actors == tokenizer_actors