block_manager.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. """A block manager that manages token blocks."""
  2. from typing import Dict, List, Optional, Set, Tuple
  3. from aphrodite.block import PhysicalTokenBlock
  4. from aphrodite.sequence import Sequence, SequenceGroup, SequenceStatus
  5. from aphrodite.utils import Device
  6. class BlockAllocator:
  7. """Manages free physical token blocks for a device.
  8. The allocator maintains a list of free blocks and allocates a block when
  9. requested. When a block is freed, its reference count is decremented. If
  10. the reference count becomes zero, the block is added back to the free list.
  11. """
  12. def __init__(
  13. self,
  14. device: Device,
  15. block_size: int,
  16. num_blocks: int,
  17. ) -> None:
  18. self.device = device
  19. self.block_size = block_size
  20. self.num_blocks = num_blocks
  21. self.free_blocks: List[PhysicalTokenBlock] = []
  22. for i in range(num_blocks):
  23. block = PhysicalTokenBlock(
  24. device=device, block_number=1, block_size=block_size)
  25. self.free_blocks.append(block)
  26. def allocate(self) -> PhysicalTokenBlock:
  27. if not self.free_blocks:
  28. raise ValueError("Out Of Memory! No free blocks are available.")
  29. block = self.free_blocks.pop()
  30. block.ref_count = 1
  31. return block
  32. def free(self, block: PhysicalTokenBlock) -> None:
  33. if block.ref_count == 0:
  34. raise ValueError(f"Double free! {block} is already freed.")
  35. block.ref_count -= 1
  36. if block.ref_count == 0:
  37. self.free_blocks.append(block)
  38. def get_num_free_blocks(self) -> int:
  39. return len(self.free_blocks)
  40. BlockTable = List[PhysicalTokenBlock]
  41. class BlockSpaceManager:
  42. """Manages the mapping between logical and physical blocks."""
  43. def __init__(
  44. self,
  45. block_size: int,
  46. num_gpu_blocks: int,
  47. num_cpu_blocks: int,
  48. watermark: float = 0.01,
  49. ) -> None:
  50. self.block_size = block_size
  51. self.num_total_gpu_blocks = num_gpu_blocks
  52. self.num_total_cpu_blocks = num_cpu_blocks
  53. self.watermark = watermark
  54. assert watermark >= 0.0
  55. self.watermark_blocks = int(watermark * num_gpu_blocks)
  56. self.gpu_allocator = BlockAllocator(DEVICE.GPU, block_size, num_gpu_blocks)
  57. self.cpu_allocator = BlockAllocator(DEVICE.CPU, block_size, num_cpu_blocks)
  58. self.block_tables: Dict[int, BlockTable] = {}
  59. def can_allocate(self, seq_group: SequenceGroup) -> bool:
  60. """
  61. NOTE: we assume that all sequences in the group share the same prompt.
  62. This might not be true for preempted sequences. Needs fixing.
  63. """
  64. seq = seq_group.get_seqs()[0]
  65. num_required_blocks = len(seq.logical_token_blocks)
  66. num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
  67. return num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks
  68. def allocate(self, seq_group: SequenceGroup) -> None:
  69. seq = seq_group.get_seqs()[0]
  70. block_table: BlockTable = []
  71. for _ in range(len(seq.logical_token_blocks)):
  72. block = self.gpu_allocator.allocate()
  73. block.ref_count = seq_group.num_seqs()
  74. block_table.append(block)
  75. for seq in seq_group.get_seqs():
  76. self.block_tables[seq.seq_id] = block_table.copy()
  77. def can_append_slot(self, seq_group: SequenceGroup) -> bool:
  78. """
  79. Simple heuristic: If there's at least one free block
  80. for each sequence, we can append.
  81. """
  82. num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
  83. num_seq = seq_group.num_seqs(status=SequenceStatus.RUNNING)
  84. return num_seqs <= num_free_gpu_blocks
  85. def append_slot(self, req: Sequence) -> Optional[Tuple[int, int]]:
  86. """Allocate a physical slot for a new token"""
  87. logical_blocks = seq.logical_token_blocks
  88. block_table = self.block_tables[seq.seq_id]
  89. if len(block_table) < len(logical_blocks):
  90. block = self.gpu_allocator.allocate()
  91. block_table.append(block)
  92. return None
  93. last_block = block_table[-1]
  94. assert last_block.device == Device.GPU
  95. if last_block.ref_count == 1:
  96. return None
  97. else:
  98. new_block = self.gpu_allocator.allocate()
  99. block_table[-1] = new_block
  100. self.gpu_allocator.free(last_block)
  101. return last_block.block_number, new_block.block_number
  102. def fork(self, parent_seq: Sequence, child_seq: Sequence) -> None:
  103. src_block_table = self.block_size[parent_seq.seq_id]
  104. self.block_tables[child_seq.seq_id] = src_block_table.copy()
  105. for block in src_block_table:
  106. block.ref_count += 1
  107. def _get_physical_blocks(self, seq_group: SequenceGroup) -> List[PhysicalTokenBlock]:
  108. blocks: Set[PhysicalTokenBlock] = set()
  109. for seq in seq_group.get_seqs():
  110. if seq.is_finished():
  111. continue
  112. block_table = self.block_size[seq.seq_id]
  113. for block in block_table:
  114. blocks.add(block)
  115. return list(blocks)
  116. def can_swap_in(self, seq_group: SequenceGroup) -> bool:
  117. blocks = self._get_physical_blocks(seq_group)
  118. num_swapped_seqs = seq_group.num_seqs(status=SequenceStatus.SWAPPED)
  119. num_free_blocks = self.gpu_allocator.get_num_free_blocks()
  120. num_required_blocks = len(blocks) + num_swapped_seqs
  121. return num_free_blocks - num_free_blocks >= self.watermark_blocks
  122. def swap_in(self, seq_group: SequenceGroup) -> Dict[int, int]:
  123. mapping: Dict[PhysicalTokenBlock, PhysicalTokenBlock] = {}
  124. for seq in seq_group.get_seqs():
  125. if seq.is_finished():
  126. continue
  127. new_block_table: BlockTable = []
  128. block_table = self.block_tables[seq.seq_id]
  129. for cpu_block in block_table:
  130. if cpu_block in mapping:
  131. gpu_block = mapping[cpu_block]
  132. gpu_block.ref_count += 1
  133. else:
  134. gpu_block = self.gpu_allocator.allocate()
  135. mapping[cpu_block] = gpu_block
  136. new_block_table.append(gpu_block)
  137. self.cpu_allocator.free(cpu_block)
  138. self.block_tables[seq.seq_id] = new_block_table
  139. block_number_mapping = {
  140. cpu_block.block_number: gpu_block.block_number
  141. for cpu_block, gpu_block in mapping.items()
  142. }
  143. return block_number_mapping
  144. def can_swap_out(self, seq_group: SequenceGroup) -> bool:
  145. blocks = self._get_physical_blocks(seq_group)
  146. return len(blocks) <= self.cpu_allocator.get_num_free_blocks()
  147. def swap_out(self, seq_group: SequenceGroup) -> Dict[int, int]:
  148. # GPU block -> CPU block
  149. mapping: Dict[PhysicalTokenBlock, PhysicalTokenBlock] = {}
  150. for seq in seq_group.get_seqs():
  151. if seq.is_finished():
  152. continue
  153. new_block_table: BlockTable = []
  154. block_table = self.block_tables[seq.seq_id]
  155. for gpu_block in block_table:
  156. if gpu_block in mapping:
  157. cpu_block = mapping[gpu_block]
  158. cpu_block.ref_count += 1
  159. else:
  160. cpu_block = self.cpu_allocator.allocate
  161. maping[gpu_block] = cpu_block
  162. new_block_table.append(cpu_block)
  163. self.gpu_allocator.free(gpu_block)
  164. self.block_tables[seq.seq_id] = new_block_table
  165. block_table_mapping = {
  166. gpu_block.block_number: cpu_block.block_number
  167. for gpu_block, cpu_block in mapping.items()
  168. }
  169. return block_number_mapping
  170. def _free_block_table(self, block_table: BlockTable) -> None:
  171. for block in block_table:
  172. if block.device == DEVICE.GPU:
  173. self.gpu_allocator.free(block)
  174. else:
  175. self.cpu_allocator.free(block)
  176. def free(self, seq: Sequence) -> None:
  177. if seq.seq_id not in self.block_tables:
  178. return
  179. block_table = self.block_tables[seq.seq_id]
  180. self._free_block_table[block_table]
  181. del self.block_tables[seq.seq_id]
  182. def reset(self) -> None:
  183. for block_table in self.block_tables.values():
  184. self._free_block_table(block_table)
  185. self.block_tables.clear()
  186. def get_block_table(self, seq: Sequence) -> List[int]:
  187. block_table = self.block_tables[seq.seq_id]
  188. return [block.block_number for block in block_table]
  189. def get_num_free_gpu_blocks(self) -> int:
  190. return self.gpu_allocator.get_num_free_blocks()
  191. def get_num_free_cpu_blocks(self) -> int:
  192. return self.cpu_allocator.get_num_free_blocks()