xpu_worker.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. """A XPU worker class."""
  2. import gc
  3. import os
  4. from typing import List, Optional, Tuple
  5. import intel_extension_for_pytorch # noqa: F401
  6. import oneccl_bindings_for_pytorch # noqa: F401
  7. import torch
  8. import torch.distributed
  9. from aphrodite.common.config import (CacheConfig, DeviceConfig, LoadConfig,
  10. LoRAConfig, ModelConfig, MultiModalConfig,
  11. ParallelConfig, PromptAdapterConfig,
  12. SchedulerConfig, SpeculativeConfig)
  13. from aphrodite.common.utils import is_xpu
  14. from aphrodite.distributed import (ensure_model_parallel_initialized,
  15. init_distributed_environment)
  16. from aphrodite.modeling import set_random_seed
  17. from aphrodite.task_handler.cache_engine import CacheEngine
  18. from aphrodite.task_handler.worker import Worker
  19. from aphrodite.task_handler.worker_base import LoraNotSupportedWorkerBase
  20. from aphrodite.task_handler.xpu_model_runner import XPUModelRunner
  21. class XPUWorker(LoraNotSupportedWorkerBase, Worker):
  22. """A worker class that executes (a partition of) the model on a GPU.
  23. Each worker is associated with a single XPU device. The worker is
  24. responsible for maintaining the KV cache and executing the model on the
  25. XPU. In case of distributed inference, each worker is assigned a partition
  26. of the model.
  27. """
  28. def __init__(
  29. self,
  30. model_config: ModelConfig,
  31. parallel_config: ParallelConfig,
  32. scheduler_config: SchedulerConfig,
  33. device_config: DeviceConfig,
  34. cache_config: CacheConfig,
  35. load_config: LoadConfig,
  36. local_rank: int,
  37. rank: int,
  38. distributed_init_method: str,
  39. lora_config: Optional[LoRAConfig] = None,
  40. multimodal_config: Optional[MultiModalConfig] = None,
  41. speculative_config: Optional[SpeculativeConfig] = None,
  42. prompt_adapter_config: Optional[PromptAdapterConfig] = None,
  43. is_driver_worker: bool = False,
  44. ) -> None:
  45. assert device_config.device_type == "xpu"
  46. assert is_xpu()
  47. self.model_config = model_config
  48. self.parallel_config = parallel_config
  49. self.scheduler_config = scheduler_config
  50. self.device_config = device_config
  51. self.cache_config = cache_config
  52. self.load_config = load_config
  53. self.local_rank = local_rank
  54. self.rank = rank
  55. self.distributed_init_method = distributed_init_method
  56. self.lora_config = lora_config
  57. self.prompt_adapter_config = prompt_adapter_config
  58. self.is_driver_worker = is_driver_worker
  59. if parallel_config and is_driver_worker:
  60. assert rank % parallel_config.tensor_parallel_size == 0, \
  61. "Driver worker should be rank 0 of tensor parallel group."
  62. self.multimodal_config = multimodal_config
  63. self.model_runner = XPUModelRunner( # type: ignore
  64. model_config,
  65. parallel_config,
  66. scheduler_config,
  67. device_config,
  68. cache_config,
  69. load_config=self.load_config,
  70. lora_config=self.lora_config,
  71. kv_cache_dtype=self.cache_config.cache_dtype,
  72. is_driver_worker=is_driver_worker,
  73. multimodal_config=multimodal_config,
  74. )
  75. # Uninitialized cache engine. Will be initialized by
  76. # initialize_cache.
  77. self.cache_engine: List[CacheEngine]
  78. self.gpu_cache: Optional[List[List[torch.Tensor]]]
  79. def init_device(self) -> None:
  80. if self.device_config.device.type == "xpu" and is_xpu():
  81. self.device = torch.device(f"xpu:{self.local_rank}")
  82. torch.xpu.set_device(self.device)
  83. torch.xpu.empty_cache()
  84. self.init_gpu_memory = torch.xpu.get_device_properties(
  85. self.local_rank).total_memory
  86. else:
  87. raise RuntimeError(
  88. f"Not support device type: {self.device_config.device}")
  89. # Initialize the distributed environment.
  90. self.init_worker_distributed_environment()
  91. # Initialize the model.
  92. set_random_seed(self.model_config.seed)
  93. # keep this method for `empty_cache` and `synchronize` api
  94. @torch.inference_mode()
  95. def determine_num_available_blocks(self) -> Tuple[int, int]:
  96. """Profiles the peak memory usage of the model to determine how many
  97. KV blocks may be allocated without OOMs.
  98. The engine will first conduct a profiling of the existing memory usage.
  99. Then, it calculate the maximum possible number of GPU and CPU blocks
  100. that can be allocated with the remaining free memory.
  101. .. tip::
  102. You may limit the usage of GPU memory
  103. by adjusting the `gpu_memory_utilization` parameter.
  104. """
  105. # Profile the memory usage of the model and get the maximum number of
  106. # cache blocks that can be allocated with the remaining free memory.
  107. torch.xpu.empty_cache()
  108. # Execute a forward pass with dummy inputs to profile the memory usage
  109. # of the model.
  110. self.model_runner.profile_run()
  111. # Calculate the number of blocks that can be allocated with the
  112. # profiled peak memory.
  113. torch.xpu.synchronize()
  114. used_memory = torch.xpu.memory_allocated()
  115. total_gpu_memory = torch.xpu.get_device_properties(
  116. self.local_rank).total_memory
  117. free_gpu_memory = total_gpu_memory - used_memory
  118. # NOTE: Here we assume that the other processes using the same
  119. # GPU did not change their memory usage during the profiling.
  120. peak_memory = self.init_gpu_memory - free_gpu_memory
  121. assert peak_memory > 0, (
  122. "Error in memory profiling. This happens when the GPU memory was "
  123. "not properly cleaned up before initializing the Aphrodite.")
  124. cache_block_size = self.get_cache_block_size_bytes()
  125. num_gpu_blocks = int(
  126. (total_gpu_memory * self.cache_config.gpu_memory_utilization -
  127. peak_memory) // cache_block_size)
  128. num_cpu_blocks = int(self.cache_config.swap_space_bytes //
  129. cache_block_size)
  130. num_gpu_blocks = max(num_gpu_blocks, 0)
  131. num_cpu_blocks = max(num_cpu_blocks, 0)
  132. gc.collect()
  133. torch.xpu.empty_cache()
  134. return num_gpu_blocks, num_cpu_blocks
  135. def _warm_up_model(self) -> None:
  136. # IPEX don't support capture graph yet
  137. pass
  138. def init_worker_distributed_environment(self) -> None:
  139. """Initialize the distributed environment."""
  140. parallel_config = self.parallel_config
  141. rank = self.rank
  142. distributed_init_method = self.distributed_init_method
  143. if torch.distributed.is_initialized():
  144. torch_world_size = torch.distributed.get_world_size()
  145. if torch_world_size != parallel_config.world_size:
  146. raise RuntimeError(
  147. "torch.distributed is already initialized but the torch "
  148. "world size does not match parallel_config.world_size "
  149. f"({torch_world_size} vs. {parallel_config.world_size}).")
  150. elif not distributed_init_method:
  151. raise ValueError(
  152. "distributed_init_method must be set if torch.distributed "
  153. "is not already initialized")
  154. else:
  155. # use sockets as default Level zero IPC exchange backend. By
  156. # default oneccl will use `drmfd` as mechanism which need extra
  157. # dependency (libdrm and drm headers) on your system.
  158. ENV_CCL_ZE_IPC_EXCHANGE = os.getenv("CCL_ZE_IPC_EXCHANGE",
  159. "sockets")
  160. ENV_LOCAL_WORLD_SIZE = os.getenv("LOCAL_WORLD_SIZE",
  161. str(parallel_config.world_size))
  162. os.environ['CCL_ZE_IPC_EXCHANGE'] = ENV_CCL_ZE_IPC_EXCHANGE
  163. os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
  164. os.environ["LOCAL_RANK"] = str(self.local_rank)
  165. init_distributed_environment(
  166. world_size=parallel_config.world_size,
  167. rank=rank,
  168. distributed_init_method=distributed_init_method,
  169. local_rank=self.local_rank,
  170. backend="ccl")
  171. ensure_model_parallel_initialized(
  172. parallel_config.tensor_parallel_size,
  173. parallel_config.pipeline_parallel_size)