ray_tokenizer_group.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import asyncio
  2. import os
  3. from typing import List, Optional
  4. try:
  5. from ray.exceptions import ActorDiedError
  6. except ImportError:
  7. # For older versions of Ray
  8. from ray.exceptions import RayActorError as ActorDiedError
  9. from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
  10. from transformers import PreTrainedTokenizer
  11. from loguru import logger
  12. from aphrodite.common.config import TokenizerPoolConfig
  13. from aphrodite.executor.ray_utils import ray
  14. from aphrodite.lora.request import LoRARequest
  15. from aphrodite.transformers_utils.tokenizer_group.base_tokenizer_group import \
  16. BaseTokenizerGroup
  17. from aphrodite.transformers_utils.tokenizer_group.tokenizer_group import \
  18. TokenizerGroup
  19. class RayTokenizerGroupPool(BaseTokenizerGroup):
  20. """A Ray-based pool of TokenizerGroups for async tokenization."""
  21. # Class to use for workers making up the pool.
  22. _worker_cls = TokenizerGroup
  23. @classmethod
  24. def from_config(cls, tokenizer_pool_config: Optional[TokenizerPoolConfig],
  25. **init_kwargs) -> "RayTokenizerGroupPool":
  26. if not tokenizer_pool_config:
  27. raise ValueError("tokenizer_pool_config must not be None.")
  28. ray_actor_options = (tokenizer_pool_config.extra_config or {
  29. "num_cpus": 0
  30. })
  31. ray_actor_options.setdefault(
  32. "scheduling_strategy",
  33. NodeAffinitySchedulingStrategy(
  34. node_id=ray.get_runtime_context().get_node_id(), soft=True))
  35. # Carry over the env vars to the actors.
  36. # This is necessary for API keys and such.
  37. ray_actor_options.setdefault("runtime_env", {})
  38. _carry_over_env_vars_to_runtime_env(ray_actor_options["runtime_env"])
  39. init_kwargs["num_actors"] = tokenizer_pool_config.pool_size
  40. init_kwargs["ray_actor_options"] = ray_actor_options
  41. return cls(**init_kwargs)
  42. def __init__(self, tokenizer_id: str, enable_lora: bool, max_num_seqs: int,
  43. max_input_length: Optional[int], num_actors: int,
  44. ray_actor_options: dict, **tokenizer_config):
  45. # Store a local copy of the TokenizerGroup for quick access
  46. # to underlying HF tokenizers.
  47. self._tokenizer_config = {
  48. "tokenizer_id": tokenizer_id,
  49. "enable_lora": enable_lora,
  50. "max_num_seqs": max_num_seqs,
  51. "max_input_length": max_input_length,
  52. **tokenizer_config
  53. }
  54. self._local_tokenizer_group = self._worker_cls(
  55. **self._tokenizer_config, )
  56. self._ray_tokenizer_group_cls = ray.remote(
  57. self._worker_cls).options(**ray_actor_options)
  58. self.tokenizer_actors = [self._init_actor() for _ in range(num_actors)]
  59. self._idle_actors: Optional[asyncio.Queue] = None
  60. # If set, actor is unhealthy. Will reraise on the next
  61. # check_health call.
  62. self._exception: Optional[ActorDiedError] = None
  63. def _init_actor(self) -> ray.ObjectRef:
  64. return self._ray_tokenizer_group_cls.remote(**self._tokenizer_config)
  65. @property
  66. def pool_size(self) -> int:
  67. return len(self.tokenizer_actors)
  68. def ping(self):
  69. return ray.get(
  70. [actor.ping.remote() for actor in self.tokenizer_actors])
  71. def _ensure_queue_initialized(self):
  72. if self._idle_actors is None:
  73. self._idle_actors = asyncio.Queue()
  74. for actor in self.tokenizer_actors:
  75. self._idle_actors.put_nowait(actor)
  76. def _finalize_encode(self, actor: ray.ObjectRef,
  77. original_actor: ray.ObjectRef, actor_is_alive: bool):
  78. assert self._idle_actors is not None
  79. # Cleanup the dead actor.
  80. if not actor_is_alive or original_actor is not actor:
  81. self.tokenizer_actors.remove(original_actor)
  82. if actor_is_alive:
  83. # Put the actor back in the queue.
  84. # This is done in a finally block to ensure that the actor is
  85. # always put back in the queue, even if an exception/cancellation
  86. # is raised.
  87. self._idle_actors.put_nowait(actor)
  88. # Add back the new actor.
  89. if original_actor is not actor:
  90. self.tokenizer_actors.append(actor)
  91. def encode(self,
  92. prompt: str,
  93. request_id: Optional[str] = None,
  94. lora_request: Optional[LoRARequest] = None) -> List[int]:
  95. """Encode a prompt using the tokenizer group.
  96. We pick an idle actor and use it to encode the prompt.
  97. The actor is then put back in the queue for future use.
  98. This is blocking.
  99. """
  100. self.check_health()
  101. self._ensure_queue_initialized()
  102. if self._idle_actors.empty():
  103. raise RuntimeError("No idle actors available.")
  104. actor = self._idle_actors.get_nowait()
  105. actor_is_alive = True
  106. original_actor = actor
  107. try:
  108. ret = ray.get(
  109. actor.encode.remote(request_id=request_id,
  110. prompt=prompt,
  111. lora_request=lora_request))
  112. except ActorDiedError as e:
  113. # If the actor is dead, we first try to reinitialize it.
  114. logger.warning(
  115. f"{actor} died with ActorDiedError, reinitializing.",
  116. exc_info=e)
  117. actor = self._init_actor()
  118. try:
  119. ret = ray.get(
  120. actor.encode.remote(request_id=request_id,
  121. prompt=prompt,
  122. lora_request=lora_request))
  123. except ActorDiedError as e:
  124. logger.error(f"{actor} died for second time in a row, marking "
  125. "RayTokenizerGroupPool as unhealthy.")
  126. actor_is_alive = False
  127. if not self._exception:
  128. self._exception = e
  129. self.check_health()
  130. finally:
  131. self._finalize_encode(actor, original_actor, actor_is_alive)
  132. return ret
  133. async def encode_async(
  134. self,
  135. prompt: str,
  136. request_id: Optional[str] = None,
  137. lora_request: Optional[LoRARequest] = None) -> List[int]:
  138. """Encode a prompt using the tokenizer group.
  139. We pick an idle actor and use it to encode the prompt.
  140. If there are no idle actors, we wait until one becomes
  141. available.
  142. The actor is then put back in the queue for future use.
  143. This is non-blocking.
  144. """
  145. self.check_health()
  146. self._ensure_queue_initialized()
  147. assert self._idle_actors is not None
  148. actor = await self._idle_actors.get()
  149. actor_is_alive = True
  150. original_actor = actor
  151. try:
  152. ret = await actor.encode.remote(request_id=request_id,
  153. prompt=prompt,
  154. lora_request=lora_request)
  155. except ActorDiedError as e:
  156. # If the actor is dead, we first try to reinitialize it.
  157. logger.warning(
  158. f"{actor} died with ActorDiedError, reinitializing.",
  159. exc_info=e)
  160. actor = self._init_actor()
  161. try:
  162. ret = await actor.encode.remote(request_id=request_id,
  163. prompt=prompt,
  164. lora_request=lora_request)
  165. except ActorDiedError as e:
  166. logger.error(f"{actor} died for second time in a row, marking "
  167. "RayTokenizerGroupPool as unhealthy.")
  168. actor_is_alive = False
  169. if not self._exception:
  170. self._exception = e
  171. self.check_health()
  172. finally:
  173. self._finalize_encode(actor, original_actor, actor_is_alive)
  174. return ret
  175. def get_max_input_len(self,
  176. lora_request: Optional[LoRARequest] = None
  177. ) -> Optional[int]:
  178. """Get the maximum input length for the LoRA request."""
  179. return self._local_tokenizer_group.get_max_input_len(lora_request)
  180. def get_lora_tokenizer(
  181. self,
  182. lora_request: Optional[LoRARequest] = None
  183. ) -> "PreTrainedTokenizer":
  184. return self._local_tokenizer_group.get_lora_tokenizer(lora_request)
  185. async def get_lora_tokenizer_async(
  186. self,
  187. lora_request: Optional[LoRARequest] = None
  188. ) -> "PreTrainedTokenizer":
  189. return await self._local_tokenizer_group.get_lora_tokenizer_async(
  190. lora_request)
  191. def check_health(self):
  192. if self._exception:
  193. raise RuntimeError(
  194. "TokenizerGroupPool is unhealthy.") from self._exception
  195. def _carry_over_env_vars_to_runtime_env(runtime_env: dict) -> None:
  196. """Copy over all current process environment variables to the runtime_env.
  197. The variables in runtime_env will take precedence over the current process
  198. environment variables.
  199. runtime_env will be modified in place."""
  200. env_vars = os.environ.copy()
  201. runtime_env.setdefault("env_vars", {})
  202. env_vars.update(runtime_env["env_vars"])
  203. runtime_env["env_vars"] = env_vars