1
0

cpu_executor.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import os
  2. from typing import Dict, List, Set
  3. import torch
  4. from loguru import logger
  5. from aphrodite.common.config import CacheConfig, ModelConfig, SchedulerConfig
  6. from aphrodite.common.sequence import SamplerOutput, SequenceGroupMetadata
  7. from aphrodite.common.utils import (get_distributed_init_method, get_ip,
  8. get_open_port, make_async)
  9. from aphrodite.executor.executor_base import ExecutorBase
  10. from aphrodite.lora.request import LoRARequest
  11. class CPUExecutor(ExecutorBase):
  12. def _init_executor(self) -> None:
  13. assert self.device_config.device_type == "cpu"
  14. assert self.lora_config is None, "cpu backend doesn't support LoRA"
  15. self.model_config = _verify_and_get_model_config(self.model_config)
  16. self.cache_config = _verify_and_get_cache_config(self.cache_config)
  17. self.scheduler_config = _verify_and_get_scheduler_config(
  18. self.scheduler_config)
  19. # Instantiate the worker and load the model to CPU.
  20. self._init_worker()
  21. def _init_worker(self):
  22. from aphrodite.task_handler.cpu_worker import CPUWorker
  23. assert (self.parallel_config.world_size == 1
  24. ), "CPUExecutor only supports single CPU socket currently."
  25. distributed_init_method = get_distributed_init_method(
  26. get_ip(), get_open_port())
  27. self.driver_worker = CPUWorker(
  28. model_config=self.model_config,
  29. parallel_config=self.parallel_config,
  30. scheduler_config=self.scheduler_config,
  31. device_config=self.device_config,
  32. cache_config=self.cache_config,
  33. local_rank=0,
  34. rank=0,
  35. distributed_init_method=distributed_init_method,
  36. lora_config=self.lora_config,
  37. kv_cache_dtype=self.cache_config.cache_dtype,
  38. is_driver_worker=True,
  39. )
  40. self.driver_worker.init_device()
  41. self.driver_worker.load_model()
  42. def determine_num_available_blocks(self) -> tuple[int, int]:
  43. """Determine the number of available KV blocks by invoking the
  44. underlying worker.
  45. """
  46. return self.driver_worker.determine_num_available_blocks()
  47. def initialize_cache(self, num_gpu_blocks: int,
  48. num_cpu_blocks: int) -> None:
  49. """Initialize the KV cache by invoking the underlying worker.
  50. """
  51. # NOTE: We log here to avoid multiple logs when number of workers is
  52. # greater than one. We could log in the engine, but not all executors
  53. # have GPUs.
  54. # NOTE: `cpu block` for CPU backend is located on CPU memory but is
  55. # referred as `gpu block`. Because we want to reuse the existing block
  56. # management procedure.
  57. logger.info(f"# CPU blocks: {num_gpu_blocks}")
  58. self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
  59. def execute_model(self,
  60. seq_group_metadata_list: List[SequenceGroupMetadata],
  61. blocks_to_swap_in: Dict[int, int],
  62. blocks_to_swap_out: Dict[int, int],
  63. blocks_to_copy: Dict[int, List[int]],
  64. num_lookahead_slots: int) -> List[SamplerOutput]:
  65. output = self.driver_worker.execute_model(
  66. seq_group_metadata_list=seq_group_metadata_list,
  67. blocks_to_swap_in=blocks_to_swap_in,
  68. blocks_to_swap_out=blocks_to_swap_out,
  69. blocks_to_copy=blocks_to_copy,
  70. num_lookahead_slots=num_lookahead_slots,
  71. )
  72. return output
  73. async def execute_model_async(
  74. self,
  75. seq_group_metadata_list: List[SequenceGroupMetadata],
  76. blocks_to_swap_in: Dict[int, int],
  77. blocks_to_swap_out: Dict[int, int],
  78. blocks_to_copy: Dict[int, List[int]],
  79. num_lookahead_slots: int,
  80. ) -> SamplerOutput:
  81. output = await make_async(self.driver_worker.execute_model)(
  82. seq_group_metadata_list=seq_group_metadata_list,
  83. blocks_to_swap_in=blocks_to_swap_in,
  84. blocks_to_swap_out=blocks_to_swap_out,
  85. blocks_to_copy=blocks_to_copy,
  86. num_lookahead_slots=num_lookahead_slots,
  87. )
  88. return output
  89. def add_lora(self, lora_request: LoRARequest) -> bool:
  90. return self.driver_worker.add_lora(lora_request)
  91. def remove_lora(self, lora_id: int) -> bool:
  92. return self.driver_worker.remove_lora(lora_id)
  93. def list_loras(self) -> Set[int]:
  94. return self.driver_worker.list_loras()
  95. def check_health(self) -> None:
  96. # CPUExecutor will always be healthy as long as
  97. # it's running.
  98. return
  99. def _verify_and_get_model_config(config: ModelConfig) -> ModelConfig:
  100. if config.dtype == torch.float16:
  101. logger.warning("float16 is not supported on CPU, casting to bfloat16.")
  102. config.dtype = torch.bfloat16
  103. if not config.enforce_eager:
  104. logger.warning(
  105. "CUDA graph is not supported on CPU, fallback to the eager "
  106. "mode.")
  107. config.enforce_eager = True
  108. return config
  109. def _verify_and_get_scheduler_config(
  110. config: SchedulerConfig) -> SchedulerConfig:
  111. if config.chunked_prefill_enabled:
  112. logger.warning("Chunked prefill is not supported on CPU, disable it.")
  113. config.chunked_prefill_enabled = False
  114. return config
  115. def _verify_and_get_cache_config(config: CacheConfig) -> CacheConfig:
  116. _GB = 1 << 30
  117. if config.context_shift:
  118. logger.warning("Prefix caching is not supported on CPU, disable it.")
  119. config.context_shift = False
  120. kv_cache_space_str = os.getenv("APHRODITE_CPU_KVCACHE_SPACE", "0")
  121. kv_cache_space = int(kv_cache_space_str)
  122. if kv_cache_space >= 0:
  123. if kv_cache_space == 0:
  124. config.cpu_kvcache_space_bytes = 4 * _GB # type: ignore
  125. logger.warning(
  126. "Environment variable APHRODITE_CPU_KVCACHE_SPACE (GB) "
  127. "for CPU backend is not set, using 4 by default.")
  128. else:
  129. config.cpu_kvcache_space_bytes = kv_cache_space * _GB # type: ignore
  130. else:
  131. raise RuntimeError(
  132. "Invalid environment variable APHRODITE_CPU_KVCACHE_SPACE"
  133. f" {kv_cache_space}, expect a positive integer value.")
  134. return config