cpu_gpu_block_allocator.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. from typing import Dict, FrozenSet, List, Optional, Tuple
  2. from aphrodite.common.utils import Device
  3. from aphrodite.processing.block.interfaces import (Block, BlockAllocator,
  4. BlockId,
  5. DeviceAwareBlockAllocator)
  6. from aphrodite.processing.block.naive_block import (NaiveBlock,
  7. NaiveBlockAllocator)
  8. from aphrodite.processing.block.prefix_caching_block import \
  9. PrefixCachingBlockAllocator
  10. class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
  11. """A block allocator that can allocate blocks on both CPU and GPU memory.
  12. This class implements the `DeviceAwareBlockAllocator` interface and provides
  13. functionality for allocating and managing blocks of memory on both CPU and
  14. GPU devices.
  15. The `CpuGpuBlockAllocator` maintains separate memory pools for CPU and GPU
  16. blocks, and allows for allocation, deallocation, forking, and swapping of
  17. blocks across these memory pools.
  18. """
  19. @staticmethod
  20. def create(
  21. allocator_type: str,
  22. num_gpu_blocks: int,
  23. num_cpu_blocks: int,
  24. block_size: int,
  25. ) -> DeviceAwareBlockAllocator:
  26. """Creates a CpuGpuBlockAllocator instance with the specified
  27. configuration.
  28. This static method creates and returns a CpuGpuBlockAllocator instance
  29. based on the provided parameters. It initializes the CPU and GPU block
  30. allocators with the specified number of blocks, block size, and
  31. allocator type.
  32. Args:
  33. allocator_type (str): The type of block allocator to use for CPU
  34. and GPU blocks. Currently supported values are "naive" and
  35. "prefix_caching".
  36. num_gpu_blocks (int): The number of blocks to allocate for GPU
  37. memory.
  38. num_cpu_blocks (int): The number of blocks to allocate for CPU
  39. memory.
  40. block_size (int): The size of each block in number of tokens.
  41. Returns:
  42. DeviceAwareBlockAllocator: A CpuGpuBlockAllocator instance with the
  43. specified configuration.
  44. Notes:
  45. - The block IDs are assigned contiguously, with GPU block IDs coming
  46. before CPU block IDs.
  47. """
  48. block_ids = list(range(num_gpu_blocks + num_cpu_blocks))
  49. gpu_block_ids = block_ids[:num_gpu_blocks]
  50. cpu_block_ids = block_ids[num_gpu_blocks:]
  51. if allocator_type == "naive":
  52. gpu_allocator: BlockAllocator = NaiveBlockAllocator(
  53. create_block=NaiveBlock, # type: ignore
  54. num_blocks=num_gpu_blocks,
  55. block_size=block_size,
  56. block_ids=gpu_block_ids,
  57. )
  58. cpu_allocator: BlockAllocator = NaiveBlockAllocator(
  59. create_block=NaiveBlock, # type: ignore
  60. num_blocks=num_cpu_blocks,
  61. block_size=block_size,
  62. block_ids=cpu_block_ids,
  63. )
  64. elif allocator_type == "prefix_caching":
  65. gpu_allocator = PrefixCachingBlockAllocator(
  66. num_blocks=num_gpu_blocks,
  67. block_size=block_size,
  68. block_ids=gpu_block_ids,
  69. )
  70. cpu_allocator = PrefixCachingBlockAllocator(
  71. num_blocks=num_cpu_blocks,
  72. block_size=block_size,
  73. block_ids=cpu_block_ids,
  74. )
  75. else:
  76. raise ValueError(f"Unknown allocator type {allocator_type=}")
  77. return CpuGpuBlockAllocator(
  78. cpu_block_allocator=cpu_allocator,
  79. gpu_block_allocator=gpu_allocator,
  80. )
  81. def __init__(
  82. self,
  83. cpu_block_allocator: BlockAllocator,
  84. gpu_block_allocator: BlockAllocator,
  85. ):
  86. assert not (
  87. cpu_block_allocator.all_block_ids
  88. & gpu_block_allocator.all_block_ids
  89. ), "cpu and gpu block allocators can't have intersection of block ids"
  90. self._allocators = {
  91. Device.CPU: cpu_block_allocator,
  92. Device.GPU: gpu_block_allocator,
  93. }
  94. self._block_ids_to_allocator: Dict[int, BlockAllocator] = {}
  95. for _, allocator in self._allocators.items():
  96. for block_id in allocator.all_block_ids:
  97. self._block_ids_to_allocator[block_id] = allocator
  98. def allocate_mutable(self, prev_block: Optional[Block],
  99. device: Device) -> Block:
  100. """Allocates a new mutable block on the specified device.
  101. Args:
  102. prev_block (Optional[Block]): The previous block to in the sequence.
  103. Used for prefix hashing.
  104. device (Device): The device on which to allocate the new block.
  105. Returns:
  106. Block: The newly allocated mutable block.
  107. """
  108. return self._allocators[device].allocate_mutable(prev_block)
  109. def allocate_immutable(self, prev_block: Optional[Block],
  110. token_ids: List[int], device: Device) -> Block:
  111. """Allocates a new immutable block with the provided token IDs on the
  112. specified device.
  113. Args:
  114. prev_block (Optional[Block]): The previous block in the sequence.
  115. Used for prefix hashing.
  116. token_ids (List[int]): The list of token IDs to be stored in the new
  117. block.
  118. device (Device): The device on which to allocate the new block.
  119. Returns:
  120. Block: The newly allocated immutable block containing the provided
  121. token IDs.
  122. """
  123. return self._allocators[device].allocate_immutable(
  124. prev_block, token_ids)
  125. def free(self, block: Block) -> None:
  126. """Frees the memory occupied by the given block.
  127. Args:
  128. block (Block): The block to be freed.
  129. """
  130. block_id = block.block_id
  131. assert block_id is not None
  132. allocator = self._block_ids_to_allocator[block_id]
  133. return allocator.free(block)
  134. def fork(self, last_block: Block) -> List[Block]:
  135. """Creates a new sequence of blocks that shares the same underlying
  136. memory as the original sequence.
  137. Args:
  138. last_block (Block): The last block in the original sequence.
  139. Returns:
  140. List[Block]: A new list of blocks that shares the same memory as the
  141. original sequence.
  142. """
  143. block_id = last_block.block_id
  144. assert block_id is not None
  145. allocator = self._block_ids_to_allocator[block_id]
  146. return allocator.fork(last_block)
  147. def get_num_free_blocks(self, device: Device) -> int:
  148. """Returns the number of free blocks available on the specified device.
  149. Args:
  150. device (Device): The device for which to query the number of free
  151. blocks. AssertionError is raised if None is passed.
  152. Returns:
  153. int: The number of free blocks available on the specified device.
  154. """
  155. return self._allocators[device].get_num_free_blocks()
  156. def get_num_total_blocks(self, device: Device) -> int:
  157. return self._allocators[device].get_num_total_blocks()
  158. def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
  159. """Clears the copy-on-write (CoW) state and returns the mapping of
  160. source to destination block IDs.
  161. Returns:
  162. List[Tuple[int, int]]: A list mapping source block IDs to
  163. destination block IDs.
  164. """
  165. # CoW only supported on GPU
  166. device = Device.GPU
  167. return self._allocators[device].clear_copy_on_writes()
  168. def mark_blocks_as_accessed(self, block_ids: List[int],
  169. now: float) -> None:
  170. """Mark blocks as accessed, only use for prefix caching."""
  171. # Prefix caching only supported on GPU.
  172. device = Device.GPU
  173. return self._allocators[device].mark_blocks_as_accessed(block_ids, now)
  174. def mark_blocks_as_computed(self, block_ids: List[int]) -> None:
  175. """Mark blocks as accessed, only use for prefix caching."""
  176. # Prefix caching only supported on GPU.
  177. device = Device.GPU
  178. return self._allocators[device].mark_blocks_as_computed(block_ids)
  179. def get_common_computed_block_ids(
  180. self, seq_block_ids: List[List[int]]) -> List[int]:
  181. # Prefix caching only supported on GPU.
  182. device = Device.GPU
  183. return self._allocators[device].get_common_computed_block_ids(
  184. seq_block_ids)
  185. @property
  186. def all_block_ids(self) -> FrozenSet[int]:
  187. return frozenset(self._block_ids_to_allocator.keys())
  188. def promote_to_immutable_block(self, block: Block) -> BlockId:
  189. raise NotImplementedError
  190. def cow_block_if_not_appendable(self, block: Block) -> Optional[BlockId]:
  191. raise NotImplementedError