123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- from typing import Dict, List, Optional
- from aphrodite.processing.block.interfaces import (
- Block,
- BlockAllocator,
- DeviceAwareBlockAllocator,
- )
- from aphrodite.processing.block.naive_block import (
- NaiveBlock,
- NaiveBlockAllocator,
- )
- from aphrodite.processing.block.prefix_caching_block import (
- PrefixCachingBlockAllocator, )
- from aphrodite.common.utils import Device
- class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
- """A block allocator that can allocate blocks on both CPU and GPU memory.
- This class implements the `DeviceAwareBlockAllocator` interface and provides
- functionality for allocating and managing blocks of memory on both CPU and
- GPU devices.
- The `CpuGpuBlockAllocator` maintains separate memory pools for CPU and GPU
- blocks, and allows for allocation, deallocation, forking, and swapping of
- blocks across these memory pools.
- """
- @staticmethod
- def create(
- allocator_type: str,
- num_gpu_blocks: int,
- num_cpu_blocks: int,
- block_size: int,
- ) -> DeviceAwareBlockAllocator:
- """Creates a CpuGpuBlockAllocator instance with the specified
- configuration.
- This static method creates and returns a CpuGpuBlockAllocator instance
- based on the provided parameters. It initializes the CPU and GPU block
- allocators with the specified number of blocks, block size, and
- allocator type.
- Args:
- allocator_type (str): The type of block allocator to use for CPU
- and GPU blocks. Currently supported values are "naive" and
- "prefix_caching".
- num_gpu_blocks (int): The number of blocks to allocate for GPU
- memory.
- num_cpu_blocks (int): The number of blocks to allocate for CPU
- memory.
- block_size (int): The size of each block in number of tokens.
- Returns:
- DeviceAwareBlockAllocator: A CpuGpuBlockAllocator instance with the
- specified configuration.
- Notes:
- - The block IDs are assigned contiguously, with GPU block IDs coming
- before CPU block IDs.
- """
- block_ids = list(range(num_gpu_blocks + num_cpu_blocks))
- gpu_block_ids = block_ids[:num_gpu_blocks]
- cpu_block_ids = block_ids[num_gpu_blocks:]
- if allocator_type == "naive":
- gpu_allocator = NaiveBlockAllocator(
- create_block=NaiveBlock,
- num_blocks=num_gpu_blocks,
- block_size=block_size,
- block_ids=gpu_block_ids,
- )
- cpu_allocator = NaiveBlockAllocator(
- create_block=NaiveBlock,
- num_blocks=num_cpu_blocks,
- block_size=block_size,
- block_ids=cpu_block_ids,
- )
- elif allocator_type == "prefix_caching":
- gpu_allocator = PrefixCachingBlockAllocator(
- num_blocks=num_gpu_blocks,
- block_size=block_size,
- block_ids=gpu_block_ids,
- )
- cpu_allocator = PrefixCachingBlockAllocator(
- num_blocks=num_cpu_blocks,
- block_size=block_size,
- block_ids=cpu_block_ids,
- )
- else:
- raise ValueError(f"Unknown allocator type {allocator_type=}")
- return CpuGpuBlockAllocator(
- cpu_block_allocator=cpu_allocator,
- gpu_block_allocator=gpu_allocator,
- )
- def __init__(
- self,
- cpu_block_allocator: BlockAllocator,
- gpu_block_allocator: BlockAllocator,
- ):
- assert not (
- cpu_block_allocator.all_block_ids
- & gpu_block_allocator.all_block_ids
- ), "cpu and gpu block allocators can't have intersection of block ids"
- self._allocators = {
- Device.CPU: cpu_block_allocator,
- Device.GPU: gpu_block_allocator,
- }
- self._block_ids_to_allocator = {}
- for _, allocator in self._allocators.items():
- for block_id in allocator.all_block_ids:
- self._block_ids_to_allocator[block_id] = allocator
- def allocate_mutable(self, prev_block: Optional[Block],
- device: Device) -> Block:
- """Allocates a new mutable block on the specified device.
- Args:
- prev_block (Optional[Block]): The previous block to in the sequence.
- Used for prefix hashing.
- device (Device): The device on which to allocate the new block.
- Returns:
- Block: The newly allocated mutable block.
- """
- return self._allocators[device].allocate_mutable(prev_block)
- def allocate_immutable(self, prev_block: Optional[Block],
- token_ids: List[int], device: Device) -> Block:
- """Allocates a new immutable block with the provided token IDs on the
- specified device.
- Args:
- prev_block (Optional[Block]): The previous block in the sequence.
- Used for prefix hashing.
- token_ids (List[int]): The list of token IDs to be stored in the new
- block.
- device (Device): The device on which to allocate the new block.
- Returns:
- Block: The newly allocated immutable block containing the provided
- token IDs.
- """
- return self._allocators[device].allocate_immutable(
- prev_block, token_ids)
- def free(self, block: Block) -> None:
- """Frees the memory occupied by the given block.
- Args:
- block (Block): The block to be freed.
- """
- allocator = self._block_ids_to_allocator[block.block_id]
- return allocator.free(block)
- def fork(self, last_block: Block) -> List[Block]:
- """Creates a new sequence of blocks that shares the same underlying
- memory as the original sequence.
- Args:
- last_block (Block): The last block in the original sequence.
- Returns:
- List[Block]: A new list of blocks that shares the same memory as the
- original sequence.
- """
- allocator = self._block_ids_to_allocator[last_block.block_id]
- return allocator.fork(last_block)
- def get_num_free_blocks(self, device: Device) -> int:
- """Returns the number of free blocks available on the specified device.
- Args:
- device (Device): The device for which to query the number of free
- blocks.
- Returns:
- int: The number of free blocks available on the specified device.
- """
- return self._allocators[device].get_num_free_blocks()
- def clear_copy_on_writes(self) -> Dict[int, List[int]]:
- """Clears the copy-on-write (CoW) state and returns the mapping of
- source to destination block IDs.
- Returns:
- Dict[int, List[int]]: A dictionary mapping source block IDs to lists
- of destination block IDs.
- """
- # CoW only supported on GPU
- device = Device.GPU
- return self._allocators[device].clear_copy_on_writes()
- def mark_blocks_as_computed(self) -> None:
- # Prefix caching only supported on GPU.
- device = Device.GPU
- return self._allocators[device].mark_blocks_as_computed()
- def get_common_computed_block_ids(
- self, seq_block_ids: List[List[int]]) -> List[int]:
- # Prefix caching only supported on GPU.
- device = Device.GPU
- return self._allocators[device].get_common_computed_block_ids(
- seq_block_ids)
- def all_block_ids(self) -> frozenset[int]:
- return frozenset(self._block_ids_to_allocator.keys())
|