1
0

xpu_worker.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. """A XPU worker class."""
  2. import gc
  3. import os
  4. from typing import List, Optional, Tuple
  5. import intel_extension_for_pytorch # noqa: F401
  6. import oneccl_bindings_for_pytorch # noqa: F401
  7. import torch
  8. import torch.distributed
  9. from aphrodite.common.config import (CacheConfig, DeviceConfig, LoadConfig,
  10. LoRAConfig, ModelConfig, MultiModalConfig,
  11. ParallelConfig, PromptAdapterConfig,
  12. SchedulerConfig, SpeculativeConfig)
  13. from aphrodite.common.utils import is_xpu
  14. from aphrodite.distributed import (ensure_model_parallel_initialized,
  15. init_distributed_environment)
  16. from aphrodite.modeling import set_random_seed
  17. from aphrodite.task_handler.cache_engine import CacheEngine
  18. from aphrodite.task_handler.worker import Worker
  19. from aphrodite.task_handler.worker_base import LoraNotSupportedWorkerBase
  20. from aphrodite.task_handler.xpu_model_runner import XPUModelRunner
  21. class XPUWorker(LoraNotSupportedWorkerBase, Worker):
  22. """A worker class that executes (a partition of) the model on a GPU.
  23. Each worker is associated with a single XPU device. The worker is
  24. responsible for maintaining the KV cache and executing the model on the
  25. XPU. In case of distributed inference, each worker is assigned a partition
  26. of the model.
  27. """
  28. def __init__(
  29. self,
  30. model_config: ModelConfig,
  31. parallel_config: ParallelConfig,
  32. scheduler_config: SchedulerConfig,
  33. device_config: DeviceConfig,
  34. cache_config: CacheConfig,
  35. load_config: LoadConfig,
  36. local_rank: int,
  37. rank: int,
  38. distributed_init_method: str,
  39. lora_config: Optional[LoRAConfig] = None,
  40. multimodal_config: Optional[MultiModalConfig] = None,
  41. speculative_config: Optional[SpeculativeConfig] = None,
  42. prompt_adapter_config: Optional[PromptAdapterConfig] = None,
  43. is_driver_worker: bool = False,
  44. ) -> None:
  45. assert device_config.device_type == "xpu"
  46. assert is_xpu()
  47. self.model_config = model_config
  48. self.parallel_config = parallel_config
  49. self.scheduler_config = scheduler_config
  50. self.device_config = device_config
  51. self.cache_config = cache_config
  52. self.load_config = load_config
  53. self.local_rank = local_rank
  54. self.rank = rank
  55. self.distributed_init_method = distributed_init_method
  56. self.lora_config = lora_config
  57. self.prompt_adapter_config = prompt_adapter_config
  58. self.is_driver_worker = is_driver_worker
  59. if self.is_driver_worker:
  60. assert self.rank == 0, "The driver worker must have rank 0."
  61. self.multimodal_config = multimodal_config
  62. self.model_runner = XPUModelRunner( # type: ignore
  63. model_config,
  64. parallel_config,
  65. scheduler_config,
  66. device_config,
  67. cache_config,
  68. load_config=self.load_config,
  69. lora_config=self.lora_config,
  70. kv_cache_dtype=self.cache_config.cache_dtype,
  71. is_driver_worker=is_driver_worker,
  72. multimodal_config=multimodal_config,
  73. )
  74. # Uninitialized cache engine. Will be initialized by
  75. # initialize_cache.
  76. self.cache_engine: List[CacheEngine]
  77. self.gpu_cache: Optional[List[List[torch.Tensor]]]
  78. def init_device(self) -> None:
  79. if self.device_config.device.type == "xpu" and is_xpu():
  80. self.device = torch.device(f"xpu:{self.local_rank}")
  81. torch.xpu.set_device(self.device)
  82. torch.xpu.empty_cache()
  83. self.init_gpu_memory = torch.xpu.get_device_properties(
  84. self.local_rank).total_memory
  85. else:
  86. raise RuntimeError(
  87. f"Not support device type: {self.device_config.device}")
  88. # Initialize the distributed environment.
  89. self.init_worker_distributed_environment()
  90. # Initialize the model.
  91. set_random_seed(self.model_config.seed)
  92. # keep this method for `empty_cache` and `synchronize` api
  93. @torch.inference_mode()
  94. def determine_num_available_blocks(self) -> Tuple[int, int]:
  95. """Profiles the peak memory usage of the model to determine how many
  96. KV blocks may be allocated without OOMs.
  97. The engine will first conduct a profiling of the existing memory usage.
  98. Then, it calculate the maximum possible number of GPU and CPU blocks
  99. that can be allocated with the remaining free memory.
  100. .. tip::
  101. You may limit the usage of GPU memory
  102. by adjusting the `gpu_memory_utilization` parameter.
  103. """
  104. # Profile the memory usage of the model and get the maximum number of
  105. # cache blocks that can be allocated with the remaining free memory.
  106. torch.xpu.empty_cache()
  107. # Execute a forward pass with dummy inputs to profile the memory usage
  108. # of the model.
  109. self.model_runner.profile_run()
  110. # Calculate the number of blocks that can be allocated with the
  111. # profiled peak memory.
  112. torch.xpu.synchronize()
  113. used_memory = torch.xpu.memory_allocated()
  114. total_gpu_memory = torch.xpu.get_device_properties(
  115. self.local_rank).total_memory
  116. free_gpu_memory = total_gpu_memory - used_memory
  117. # NOTE: Here we assume that the other processes using the same
  118. # GPU did not change their memory usage during the profiling.
  119. peak_memory = self.init_gpu_memory - free_gpu_memory
  120. assert peak_memory > 0, (
  121. "Error in memory profiling. This happens when the GPU memory was "
  122. "not properly cleaned up before initializing the Aphrodite.")
  123. cache_block_size = self.get_cache_block_size_bytes()
  124. num_gpu_blocks = int(
  125. (total_gpu_memory * self.cache_config.gpu_memory_utilization -
  126. peak_memory) // cache_block_size)
  127. num_cpu_blocks = int(self.cache_config.swap_space_bytes //
  128. cache_block_size)
  129. num_gpu_blocks = max(num_gpu_blocks, 0)
  130. num_cpu_blocks = max(num_cpu_blocks, 0)
  131. gc.collect()
  132. torch.xpu.empty_cache()
  133. return num_gpu_blocks, num_cpu_blocks
  134. def _warm_up_model(self) -> None:
  135. # IPEX don't support capture graph yet
  136. pass
  137. def init_worker_distributed_environment(self) -> None:
  138. """Initialize the distributed environment."""
  139. parallel_config = self.parallel_config
  140. rank = self.rank
  141. distributed_init_method = self.distributed_init_method
  142. if torch.distributed.is_initialized():
  143. torch_world_size = torch.distributed.get_world_size()
  144. if torch_world_size != parallel_config.world_size:
  145. raise RuntimeError(
  146. "torch.distributed is already initialized but the torch "
  147. "world size does not match parallel_config.world_size "
  148. f"({torch_world_size} vs. {parallel_config.world_size}).")
  149. elif not distributed_init_method:
  150. raise ValueError(
  151. "distributed_init_method must be set if torch.distributed "
  152. "is not already initialized")
  153. else:
  154. # use sockets as default Level zero IPC exchange backend. By
  155. # default oneccl will use `drmfd` as mechanism which need extra
  156. # dependency (libdrm and drm headers) on your system.
  157. ENV_CCL_ZE_IPC_EXCHANGE = os.getenv("CCL_ZE_IPC_EXCHANGE",
  158. "sockets")
  159. os.environ['CCL_ZE_IPC_EXCHANGE'] = ENV_CCL_ZE_IPC_EXCHANGE
  160. init_distributed_environment(
  161. world_size=parallel_config.world_size,
  162. rank=rank,
  163. distributed_init_method=distributed_init_method,
  164. local_rank=self.local_rank,
  165. backend="ccl")
  166. ensure_model_parallel_initialized(
  167. parallel_config.tensor_parallel_size,
  168. parallel_config.pipeline_parallel_size)