cpu_executor.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import os
  2. from typing import List, Set, Tuple
  3. import torch
  4. from loguru import logger
  5. from aphrodite.common.config import CacheConfig, ModelConfig, SchedulerConfig
  6. from aphrodite.common.sequence import ExecuteModelRequest, SamplerOutput
  7. from aphrodite.common.utils import (get_distributed_init_method, get_ip,
  8. get_open_port, make_async)
  9. from aphrodite.executor.executor_base import ExecutorAsyncBase, ExecutorBase
  10. from aphrodite.lora.request import LoRARequest
  11. class CPUExecutor(ExecutorBase):
  12. def _init_executor(self) -> None:
  13. assert self.device_config.device_type == "cpu"
  14. assert self.lora_config is None, "cpu backend doesn't support LoRA"
  15. self.model_config = _verify_and_get_model_config(self.model_config)
  16. self.cache_config = _verify_and_get_cache_config(self.cache_config)
  17. self.scheduler_config = _verify_and_get_scheduler_config(
  18. self.scheduler_config)
  19. # Instantiate the worker and load the model to CPU.
  20. self._init_worker()
  21. def _init_worker(self):
  22. from aphrodite.task_handler.cpu_worker import CPUWorker
  23. assert self.parallel_config.world_size == 1, (
  24. "CPUExecutor only supports single CPU socket currently.")
  25. distributed_init_method = get_distributed_init_method(
  26. get_ip(), get_open_port())
  27. self.driver_worker = CPUWorker(
  28. model_config=self.model_config,
  29. parallel_config=self.parallel_config,
  30. scheduler_config=self.scheduler_config,
  31. device_config=self.device_config,
  32. cache_config=self.cache_config,
  33. load_config=self.load_config,
  34. local_rank=0,
  35. rank=0,
  36. distributed_init_method=distributed_init_method,
  37. lora_config=self.lora_config,
  38. vision_language_config=self.vision_language_config,
  39. kv_cache_dtype=self.cache_config.cache_dtype,
  40. is_driver_worker=True,
  41. )
  42. self.driver_worker.init_device()
  43. self.driver_worker.load_model()
  44. def determine_num_available_blocks(self) -> Tuple[int, int]:
  45. """Determine the number of available KV blocks by invoking the
  46. underlying worker.
  47. """
  48. return self.driver_worker.determine_num_available_blocks()
  49. def initialize_cache(self, num_gpu_blocks: int,
  50. num_cpu_blocks: int) -> None:
  51. """Initialize the KV cache by invoking the underlying worker.
  52. """
  53. # NOTE: We log here to avoid multiple logs when number of workers is
  54. # greater than one. We could log in the engine, but not all executors
  55. # have GPUs.
  56. # NOTE: `cpu block` for CPU backend is located on CPU memory but is
  57. # referred as `gpu block`. Because we want to reuse the existing block
  58. # management procedure.
  59. logger.info(f"# CPU blocks: {num_gpu_blocks}")
  60. self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
  61. def execute_model(
  62. self,
  63. execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
  64. output = self.driver_worker.execute_model(execute_model_req)
  65. return output
  66. def add_lora(self, lora_request: LoRARequest) -> bool:
  67. return self.driver_worker.add_lora(lora_request)
  68. def remove_lora(self, lora_id: int) -> bool:
  69. return self.driver_worker.remove_lora(lora_id)
  70. def list_loras(self) -> Set[int]:
  71. return self.driver_worker.list_loras()
  72. def check_health(self) -> None:
  73. # CPUExecutor will always be healthy as long as
  74. # it's running.
  75. return
  76. class CPUExecutorAsync(CPUExecutor, ExecutorAsyncBase):
  77. async def execute_model_async(
  78. self,
  79. execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
  80. output = await make_async(self.driver_worker.execute_model
  81. )(execute_model_req=execute_model_req, )
  82. return output
  83. async def check_health_async(self) -> None:
  84. # CPUExecutor will always be healthy as long as
  85. # it's running.
  86. return
  87. def _verify_and_get_model_config(config: ModelConfig) -> ModelConfig:
  88. if config.dtype == torch.float16:
  89. logger.warning("float16 is not supported on CPU, casting to bfloat16.")
  90. config.dtype = torch.bfloat16
  91. if not config.enforce_eager:
  92. logger.warning(
  93. "CUDA graph is not supported on CPU, fallback to the eager "
  94. "mode.")
  95. config.enforce_eager = True
  96. return config
  97. def _verify_and_get_scheduler_config(
  98. config: SchedulerConfig) -> SchedulerConfig:
  99. if config.chunked_prefill_enabled:
  100. logger.warning("Chunked prefill is not supported on CPU, disable it.")
  101. config.chunked_prefill_enabled = False
  102. return config
  103. def _verify_and_get_cache_config(config: CacheConfig) -> CacheConfig:
  104. _GB = 1 << 30
  105. if config.enable_prefix_caching:
  106. logger.warning("Prefix caching is not supported on CPU, disable it.")
  107. config.enable_prefix_caching = False
  108. kv_cache_space_str = os.getenv("APHRODITE_CPU_KVCACHE_SPACE", "0")
  109. kv_cache_space = int(kv_cache_space_str)
  110. if kv_cache_space >= 0:
  111. if kv_cache_space == 0:
  112. config.cpu_kvcache_space_bytes = 4 * _GB # type: ignore
  113. logger.warning(
  114. "Environment variable APHRODITE_CPU_KVCACHE_SPACE (GB) "
  115. "for CPU backend is not set, using 4 by default.")
  116. else:
  117. config.cpu_kvcache_space_bytes = kv_cache_space * _GB # type: ignore
  118. else:
  119. raise RuntimeError(
  120. "Invalid environment variable APHRODITE_CPU_KVCACHE_SPACE"
  121. f" {kv_cache_space}, expect a positive integer value.")
  122. return config