cpu_gpu_block_allocator.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. from typing import Dict, FrozenSet, List, Optional, Tuple
  2. from aphrodite.common.utils import Device
  3. from aphrodite.processing.block.interfaces import (Block, BlockAllocator,
  4. BlockId,
  5. DeviceAwareBlockAllocator)
  6. from aphrodite.processing.block.naive_block import (NaiveBlock,
  7. NaiveBlockAllocator)
  8. from aphrodite.processing.block.prefix_caching_block import (
  9. PrefixCachingBlockAllocator)
  10. class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
  11. """A block allocator that can allocate blocks on both CPU and GPU memory.
  12. This class implements the `DeviceAwareBlockAllocator` interface and provides
  13. functionality for allocating and managing blocks of memory on both CPU and
  14. GPU devices.
  15. The `CpuGpuBlockAllocator` maintains separate memory pools for CPU and GPU
  16. blocks, and allows for allocation, deallocation, forking, and swapping of
  17. blocks across these memory pools.
  18. """
  19. @staticmethod
  20. def create(
  21. allocator_type: str,
  22. num_gpu_blocks: int,
  23. num_cpu_blocks: int,
  24. block_size: int,
  25. ) -> DeviceAwareBlockAllocator:
  26. """Creates a CpuGpuBlockAllocator instance with the specified
  27. configuration.
  28. This static method creates and returns a CpuGpuBlockAllocator instance
  29. based on the provided parameters. It initializes the CPU and GPU block
  30. allocators with the specified number of blocks, block size, and
  31. allocator type.
  32. Args:
  33. allocator_type (str): The type of block allocator to use for CPU
  34. and GPU blocks. Currently supported values are "naive" and
  35. "prefix_caching".
  36. num_gpu_blocks (int): The number of blocks to allocate for GPU
  37. memory.
  38. num_cpu_blocks (int): The number of blocks to allocate for CPU
  39. memory.
  40. block_size (int): The size of each block in number of tokens.
  41. Returns:
  42. DeviceAwareBlockAllocator: A CpuGpuBlockAllocator instance with the
  43. specified configuration.
  44. Notes:
  45. - The block IDs are assigned contiguously, with GPU block IDs coming
  46. before CPU block IDs.
  47. """
  48. block_ids = list(range(num_gpu_blocks + num_cpu_blocks))
  49. gpu_block_ids = block_ids[:num_gpu_blocks]
  50. cpu_block_ids = block_ids[num_gpu_blocks:]
  51. if allocator_type == "naive":
  52. gpu_allocator: BlockAllocator = NaiveBlockAllocator(
  53. create_block=NaiveBlock, # type: ignore
  54. num_blocks=num_gpu_blocks,
  55. block_size=block_size,
  56. block_ids=gpu_block_ids,
  57. )
  58. cpu_allocator: BlockAllocator = NaiveBlockAllocator(
  59. create_block=NaiveBlock, # type: ignore
  60. num_blocks=num_cpu_blocks,
  61. block_size=block_size,
  62. block_ids=cpu_block_ids,
  63. )
  64. elif allocator_type == "prefix_caching":
  65. gpu_allocator = PrefixCachingBlockAllocator(
  66. num_blocks=num_gpu_blocks,
  67. block_size=block_size,
  68. block_ids=gpu_block_ids,
  69. )
  70. cpu_allocator = PrefixCachingBlockAllocator(
  71. num_blocks=num_cpu_blocks,
  72. block_size=block_size,
  73. block_ids=cpu_block_ids,
  74. )
  75. else:
  76. raise ValueError(f"Unknown allocator type {allocator_type=}")
  77. return CpuGpuBlockAllocator(
  78. cpu_block_allocator=cpu_allocator,
  79. gpu_block_allocator=gpu_allocator,
  80. )
  81. def __init__(self, cpu_block_allocator: BlockAllocator,
  82. gpu_block_allocator: BlockAllocator):
  83. assert not (
  84. cpu_block_allocator.all_block_ids
  85. & gpu_block_allocator.all_block_ids
  86. ), "cpu and gpu block allocators can't have intersection of block ids"
  87. self._allocators = {
  88. Device.CPU: cpu_block_allocator,
  89. Device.GPU: gpu_block_allocator,
  90. }
  91. self._swap_mapping: Dict[int, int] = {}
  92. self._null_block: Optional[Block] = None
  93. self._block_ids_to_allocator: Dict[int, BlockAllocator] = {}
  94. for _, allocator in self._allocators.items():
  95. for block_id in allocator.all_block_ids:
  96. self._block_ids_to_allocator[block_id] = allocator
  97. def allocate_or_get_null_block(self) -> Block:
  98. if self._null_block is None:
  99. self._null_block = NullBlock(
  100. self.allocate_mutable_block(None, Device.GPU))
  101. return self._null_block
  102. def allocate_mutable_block(self, prev_block: Optional[Block],
  103. device: Device) -> Block:
  104. """Allocates a new mutable block on the specified device.
  105. Args:
  106. prev_block (Optional[Block]): The previous block to in the sequence.
  107. Used for prefix hashing.
  108. device (Device): The device on which to allocate the new block.
  109. Returns:
  110. Block: The newly allocated mutable block.
  111. """
  112. return self._allocators[device].allocate_mutable_block(prev_block)
  113. def allocate_immutable_blocks(self, prev_block: Optional[Block],
  114. block_token_ids: List[List[int]],
  115. device: Optional[Device]) -> List[Block]:
  116. """Allocates a new group of immutable blocks with the provided block
  117. token IDs on the specified device.
  118. Args:
  119. prev_block (Optional[Block]): The previous block in the sequence.
  120. Used for prefix hashing.
  121. block_token_ids (List[int]): The list of block token IDs to be
  122. stored in the new blocks.
  123. device (Device): The device on which to allocate the new block.
  124. Returns:
  125. List[Block]: The newly allocated list of immutable blocks
  126. containing the provided block token IDs.
  127. """
  128. return self._allocators[device].allocate_immutable_blocks(
  129. prev_block, block_token_ids)
  130. def allocate_immutable_block(self, prev_block: Optional[Block],
  131. token_ids: List[int],
  132. device: Device) -> Block:
  133. """Allocates a new immutable block with the provided token IDs on the
  134. specified device.
  135. Args:
  136. prev_block (Optional[Block]): The previous block in the sequence.
  137. Used for prefix hashing.
  138. token_ids (List[int]): The list of token IDs to be stored in the new
  139. block.
  140. device (Device): The device on which to allocate the new block.
  141. Returns:
  142. Block: The newly allocated immutable block containing the provided
  143. token IDs.
  144. """
  145. return self._allocators[device].allocate_immutable_block(
  146. prev_block, token_ids)
  147. def free(self, block: Block) -> None:
  148. """Frees the memory occupied by the given block.
  149. Args:
  150. block (Block): The block to be freed.
  151. """
  152. # Null block should never be freed
  153. if isinstance(block, NullBlock):
  154. return
  155. block_id = block.block_id
  156. assert block_id is not None
  157. allocator = self._block_ids_to_allocator[block_id]
  158. allocator.free(block)
  159. def fork(self, last_block: Block) -> List[Block]:
  160. """Creates a new sequence of blocks that shares the same underlying
  161. memory as the original sequence.
  162. Args:
  163. last_block (Block): The last block in the original sequence.
  164. Returns:
  165. List[Block]: A new list of blocks that shares the same memory as the
  166. original sequence.
  167. """
  168. # do not attempt to fork the null block
  169. assert not isinstance(last_block, NullBlock)
  170. block_id = last_block.block_id
  171. assert block_id is not None
  172. allocator = self._block_ids_to_allocator[block_id]
  173. return allocator.fork(last_block)
  174. def get_num_free_blocks(self, device: Device) -> int:
  175. """Returns the number of free blocks available on the specified device.
  176. Args:
  177. device (Device): The device for which to query the number of free
  178. blocks. AssertionError is raised if None is passed.
  179. Returns:
  180. int: The number of free blocks available on the specified device.
  181. """
  182. return self._allocators[device].get_num_free_blocks()
  183. def get_num_total_blocks(self, device: Device) -> int:
  184. return self._allocators[device].get_num_total_blocks()
  185. def get_physical_block_id(self, device: Device, absolute_id: int) -> int:
  186. """Returns the zero-offset block id on certain device given the
  187. absolute block id.
  188. Args:
  189. device (Device): The device for which to query relative block id.
  190. absolute_id (int): The absolute block id for the block in
  191. whole allocator.
  192. Returns:
  193. int: The zero-offset block id on certain device.
  194. """
  195. return self._allocators[device].get_physical_block_id(absolute_id)
  196. def swap(self, blocks: List[Block], src_device: Device,
  197. dst_device: Device) -> Dict[int, int]:
  198. """Execute the swap for the given blocks from source_device
  199. on to dest_device, save the current swap mapping and append
  200. them to the accumulated `self._swap_mapping` for each
  201. scheduling move.
  202. Args:
  203. blocks: List of blocks to be swapped.
  204. src_device (Device): Device to swap the 'blocks' from.
  205. dst_device (Device): Device to swap the 'blocks' to.
  206. Returns:
  207. Dict[int, int]: Swap mapping from source_device
  208. on to dest_device.
  209. """
  210. src_block_ids = [block.block_id for block in blocks]
  211. self._allocators[src_device].swap_out(blocks)
  212. self._allocators[dst_device].swap_in(blocks)
  213. dst_block_ids = [block.block_id for block in blocks]
  214. current_swap_mapping: Dict[int, int] = {}
  215. for src_block_id, dst_block_id in zip(src_block_ids, dst_block_ids):
  216. if src_block_id is not None and dst_block_id is not None:
  217. self._swap_mapping[src_block_id] = dst_block_id
  218. current_swap_mapping[src_block_id] = dst_block_id
  219. return current_swap_mapping
  220. def get_num_blocks_touched(self,
  221. blocks: List[Block],
  222. device: Device,
  223. num_lookahead_slots: int = 0) -> int:
  224. """Returns the number of blocks that will be touched by
  225. swapping in/out the given blocks on to the 'device'.
  226. Args:
  227. blocks: List of blocks to be swapped.
  228. device (Device): Device to swap the 'blocks' on.
  229. num_lookahead_slots (int): Number of lookahead slots used in
  230. speculative decoding, default to 0.
  231. Returns:
  232. int: the number of blocks that will be touched by
  233. swapping in/out the given blocks on to the 'device'.
  234. """
  235. return self._allocators[device].get_num_blocks_touched(
  236. blocks, num_lookahead_slots)
  237. def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
  238. """Clears the copy-on-write (CoW) state and returns the mapping of
  239. source to destination block IDs.
  240. Returns:
  241. List[Tuple[int, int]]: A list mapping source block IDs to
  242. destination block IDs.
  243. """
  244. # CoW only supported on GPU
  245. device = Device.GPU
  246. return self._allocators[device].clear_copy_on_writes()
  247. def mark_blocks_as_accessed(self, block_ids: List[int],
  248. now: float) -> None:
  249. """Mark blocks as accessed, only use for prefix caching."""
  250. # Prefix caching only supported on GPU.
  251. device = Device.GPU
  252. return self._allocators[device].mark_blocks_as_accessed(block_ids, now)
  253. def mark_blocks_as_computed(self, block_ids: List[int]) -> None:
  254. """Mark blocks as accessed, only use for prefix caching."""
  255. # Prefix caching only supported on GPU.
  256. device = Device.GPU
  257. return self._allocators[device].mark_blocks_as_computed(block_ids)
  258. def get_computed_block_ids(self, prev_computed_block_ids: List[int],
  259. block_ids: List[int],
  260. skip_last_block_id: bool) -> List[int]:
  261. # Prefix caching only supported on GPU.
  262. device = Device.GPU
  263. return self._allocators[device].get_computed_block_ids(
  264. prev_computed_block_ids, block_ids, skip_last_block_id)
  265. def get_common_computed_block_ids(
  266. self, computed_seq_block_ids: List[List[int]]) -> List[int]:
  267. # Prefix caching only supported on GPU.
  268. device = Device.GPU
  269. return self._allocators[device].get_common_computed_block_ids(
  270. computed_seq_block_ids)
  271. @property
  272. def all_block_ids(self) -> FrozenSet[int]:
  273. return frozenset(self._block_ids_to_allocator.keys())
  274. def get_and_reset_swaps(self) -> List[Tuple[int, int]]:
  275. """Returns and clears the mapping of source to destination block IDs.
  276. Will be called after every swapping operations for now, and after every
  277. schedule when BlockManagerV2 become default. Currently not useful.
  278. Returns:
  279. List[Tuple[int, int]]: A mapping of source to destination block IDs.
  280. """
  281. mapping = self._swap_mapping.copy()
  282. self._swap_mapping.clear()
  283. return list(mapping.items())
  284. class NullBlock(Block):
  285. """
  286. Null blocks are used as a placeholders for KV cache blocks that have
  287. been dropped due to sliding window.
  288. This implementation just wraps an ordinary block and prevents it from
  289. being modified. It also allows for testing if a block is NullBlock
  290. via isinstance().
  291. """
  292. def __init__(self, proxy: Block):
  293. super().__init__()
  294. self._proxy = proxy
  295. def append_token_ids(self, token_ids: List[BlockId]):
  296. raise ValueError("null block should not be modified")
  297. @property
  298. def block_id(self):
  299. return self._proxy.block_id
  300. @block_id.setter
  301. def block_id(self, value: Optional[BlockId]):
  302. raise ValueError("null block should not be modified")
  303. @property
  304. def token_ids(self) -> List[BlockId]:
  305. return self._proxy.token_ids
  306. @property
  307. def num_tokens_total(self) -> int:
  308. raise NotImplementedError(
  309. "num_tokens_total is not used for null block")
  310. @property
  311. def num_empty_slots(self) -> BlockId:
  312. return self._proxy.num_empty_slots
  313. @property
  314. def is_full(self):
  315. return self._proxy.is_full
  316. @property
  317. def prev_block(self):
  318. return self._proxy.prev_block
  319. @property
  320. def computed(self):
  321. return self._proxy.computed
  322. @computed.setter
  323. def computed(self, value):
  324. self._proxy.computed = value
  325. @property
  326. def last_accessed(self) -> float:
  327. return self._proxy.last_accessed
  328. @last_accessed.setter
  329. def last_accessed(self, last_accessed_ts: float):
  330. self._proxy.last_accessed = last_accessed_ts
  331. @property
  332. def content_hash(self):
  333. return self._proxy.content_hash