cpu_gpu_block_allocator.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. from typing import Dict, List, Optional
  2. from aphrodite.common.utils import Device
  3. from aphrodite.processing.block.interfaces import (Block, BlockAllocator,
  4. DeviceAwareBlockAllocator)
  5. from aphrodite.processing.block.naive_block import (NaiveBlock,
  6. NaiveBlockAllocator)
  7. from aphrodite.processing.block.prefix_caching_block import \
  8. PrefixCachingBlockAllocator
  9. class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
  10. """A block allocator that can allocate blocks on both CPU and GPU memory.
  11. This class implements the `DeviceAwareBlockAllocator` interface and provides
  12. functionality for allocating and managing blocks of memory on both CPU and
  13. GPU devices.
  14. The `CpuGpuBlockAllocator` maintains separate memory pools for CPU and GPU
  15. blocks, and allows for allocation, deallocation, forking, and swapping of
  16. blocks across these memory pools.
  17. """
  18. @staticmethod
  19. def create(
  20. allocator_type: str,
  21. num_gpu_blocks: int,
  22. num_cpu_blocks: int,
  23. block_size: int,
  24. ) -> DeviceAwareBlockAllocator:
  25. """Creates a CpuGpuBlockAllocator instance with the specified
  26. configuration.
  27. This static method creates and returns a CpuGpuBlockAllocator instance
  28. based on the provided parameters. It initializes the CPU and GPU block
  29. allocators with the specified number of blocks, block size, and
  30. allocator type.
  31. Args:
  32. allocator_type (str): The type of block allocator to use for CPU
  33. and GPU blocks. Currently supported values are "naive" and
  34. "prefix_caching".
  35. num_gpu_blocks (int): The number of blocks to allocate for GPU
  36. memory.
  37. num_cpu_blocks (int): The number of blocks to allocate for CPU
  38. memory.
  39. block_size (int): The size of each block in number of tokens.
  40. Returns:
  41. DeviceAwareBlockAllocator: A CpuGpuBlockAllocator instance with the
  42. specified configuration.
  43. Notes:
  44. - The block IDs are assigned contiguously, with GPU block IDs coming
  45. before CPU block IDs.
  46. """
  47. block_ids = list(range(num_gpu_blocks + num_cpu_blocks))
  48. gpu_block_ids = block_ids[:num_gpu_blocks]
  49. cpu_block_ids = block_ids[num_gpu_blocks:]
  50. if allocator_type == "naive":
  51. gpu_allocator = NaiveBlockAllocator(
  52. create_block=NaiveBlock,
  53. num_blocks=num_gpu_blocks,
  54. block_size=block_size,
  55. block_ids=gpu_block_ids,
  56. )
  57. cpu_allocator = NaiveBlockAllocator(
  58. create_block=NaiveBlock,
  59. num_blocks=num_cpu_blocks,
  60. block_size=block_size,
  61. block_ids=cpu_block_ids,
  62. )
  63. elif allocator_type == "prefix_caching":
  64. gpu_allocator = PrefixCachingBlockAllocator(
  65. num_blocks=num_gpu_blocks,
  66. block_size=block_size,
  67. block_ids=gpu_block_ids,
  68. )
  69. cpu_allocator = PrefixCachingBlockAllocator(
  70. num_blocks=num_cpu_blocks,
  71. block_size=block_size,
  72. block_ids=cpu_block_ids,
  73. )
  74. else:
  75. raise ValueError(f"Unknown allocator type {allocator_type=}")
  76. return CpuGpuBlockAllocator(
  77. cpu_block_allocator=cpu_allocator,
  78. gpu_block_allocator=gpu_allocator,
  79. )
  80. def __init__(
  81. self,
  82. cpu_block_allocator: BlockAllocator,
  83. gpu_block_allocator: BlockAllocator,
  84. ):
  85. assert not (
  86. cpu_block_allocator.all_block_ids
  87. & gpu_block_allocator.all_block_ids
  88. ), "cpu and gpu block allocators can't have intersection of block ids"
  89. self._allocators = {
  90. Device.CPU: cpu_block_allocator,
  91. Device.GPU: gpu_block_allocator,
  92. }
  93. self._block_ids_to_allocator = {}
  94. for _, allocator in self._allocators.items():
  95. for block_id in allocator.all_block_ids:
  96. self._block_ids_to_allocator[block_id] = allocator
  97. def allocate_mutable(self, prev_block: Optional[Block],
  98. device: Device) -> Block:
  99. """Allocates a new mutable block on the specified device.
  100. Args:
  101. prev_block (Optional[Block]): The previous block to in the sequence.
  102. Used for prefix hashing.
  103. device (Device): The device on which to allocate the new block.
  104. Returns:
  105. Block: The newly allocated mutable block.
  106. """
  107. return self._allocators[device].allocate_mutable(prev_block)
  108. def allocate_immutable(self, prev_block: Optional[Block],
  109. token_ids: List[int], device: Device) -> Block:
  110. """Allocates a new immutable block with the provided token IDs on the
  111. specified device.
  112. Args:
  113. prev_block (Optional[Block]): The previous block in the sequence.
  114. Used for prefix hashing.
  115. token_ids (List[int]): The list of token IDs to be stored in the new
  116. block.
  117. device (Device): The device on which to allocate the new block.
  118. Returns:
  119. Block: The newly allocated immutable block containing the provided
  120. token IDs.
  121. """
  122. return self._allocators[device].allocate_immutable(
  123. prev_block, token_ids)
  124. def free(self, block: Block) -> None:
  125. """Frees the memory occupied by the given block.
  126. Args:
  127. block (Block): The block to be freed.
  128. """
  129. allocator = self._block_ids_to_allocator[block.block_id]
  130. return allocator.free(block)
  131. def fork(self, last_block: Block) -> List[Block]:
  132. """Creates a new sequence of blocks that shares the same underlying
  133. memory as the original sequence.
  134. Args:
  135. last_block (Block): The last block in the original sequence.
  136. Returns:
  137. List[Block]: A new list of blocks that shares the same memory as the
  138. original sequence.
  139. """
  140. allocator = self._block_ids_to_allocator[last_block.block_id]
  141. return allocator.fork(last_block)
  142. def get_num_free_blocks(self, device: Device) -> int:
  143. """Returns the number of free blocks available on the specified device.
  144. Args:
  145. device (Device): The device for which to query the number of free
  146. blocks.
  147. Returns:
  148. int: The number of free blocks available on the specified device.
  149. """
  150. return self._allocators[device].get_num_free_blocks()
  151. def clear_copy_on_writes(self) -> Dict[int, List[int]]:
  152. """Clears the copy-on-write (CoW) state and returns the mapping of
  153. source to destination block IDs.
  154. Returns:
  155. Dict[int, List[int]]: A dictionary mapping source block IDs to lists
  156. of destination block IDs.
  157. """
  158. # CoW only supported on GPU
  159. device = Device.GPU
  160. return self._allocators[device].clear_copy_on_writes()
  161. def mark_blocks_as_computed(self) -> None:
  162. # Prefix caching only supported on GPU.
  163. device = Device.GPU
  164. return self._allocators[device].mark_blocks_as_computed()
  165. def get_common_computed_block_ids(
  166. self, seq_block_ids: List[List[int]]) -> List[int]:
  167. # Prefix caching only supported on GPU.
  168. device = Device.GPU
  169. return self._allocators[device].get_common_computed_block_ids(
  170. seq_block_ids)
  171. def all_block_ids(self) -> frozenset[int]:
  172. return frozenset(self._block_ids_to_allocator.keys())