123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- from abc import ABC, abstractmethod
- from typing import Dict, FrozenSet, List, Optional, Protocol, Tuple
- from aphrodite.common.utils import Device
- BlockId = int
- class Block(ABC):
- @abstractmethod
- def append_token_ids(self, token_ids: List[int]) -> None:
- pass
- @property
- @abstractmethod
- def block_id(self) -> Optional[int]:
- pass
- @block_id.setter
- @abstractmethod
- def block_id(self, value: Optional[int]) -> None:
- """NOTE: Do not use this API outside Block."""
- self._block_id = value
- @property
- @abstractmethod
- def token_ids(self) -> List[int]:
- pass
- @property
- @abstractmethod
- def num_tokens_total(self) -> int:
- """The number of tokens till the current block (inclusive)
- """
- pass
- @property
- @abstractmethod
- def num_empty_slots(self) -> int:
- pass
- @property
- @abstractmethod
- def is_full(self) -> bool:
- pass
- @property
- @abstractmethod
- def prev_block(self) -> Optional["Block"]:
- pass
- @property
- @abstractmethod
- def computed(self) -> bool:
- raise NotImplementedError
- @computed.setter
- @abstractmethod
- def computed(self, value) -> bool:
- """Should be only used by PrefixCacingAllocator"""
- raise NotImplementedError
- @property
- @abstractmethod
- def last_accessed(self) -> float:
- raise NotImplementedError
- @last_accessed.setter
- @abstractmethod
- def last_accessed(self, last_accessed_ts: float):
- raise NotImplementedError
- class Factory(Protocol):
- @abstractmethod
- def __call__(
- self,
- prev_block: Optional["Block"],
- token_ids: List[int],
- block_size: int,
- allocator: "BlockAllocator",
- block_id: Optional[int] = None,
- ) -> "Block":
- pass
- @property
- @abstractmethod
- def content_hash(self) -> Optional[int]:
- """Return the content-based hash of the current block, or None if it is
- not yet defined or not supported.
- For the content-based hash to be defined, the current block must be
- full.
- """
- return None
- class BlockAllocator(ABC):
- @abstractmethod
- def allocate_mutable_block(self, prev_block: Optional[Block]) -> Block:
- pass
- @abstractmethod
- def allocate_immutable_block(self, prev_block: Optional[Block],
- token_ids: List[int]) -> Block:
- pass
- @abstractmethod
- def allocate_immutable_blocks(
- self, prev_block: Optional[Block],
- block_token_ids: List[List[int]]) -> List[Block]:
- pass
- @abstractmethod
- def free(self, block: Block) -> None:
- pass
- @abstractmethod
- def fork(self, last_block: Block) -> List[Block]:
- pass
- @abstractmethod
- def get_num_total_blocks(self) -> int:
- pass
- @abstractmethod
- def get_num_free_blocks(self) -> int:
- pass
- @abstractmethod
- def get_physical_block_id(self, absolute_id: int) -> int:
- pass
- @abstractmethod
- def swap_out(self, blocks: List[Block]) -> None:
- pass
- @abstractmethod
- def swap_in(self, blocks: List[Block]) -> None:
- pass
- @property
- @abstractmethod
- def all_block_ids(self) -> FrozenSet[int]:
- pass
- @abstractmethod
- def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
- pass
- @abstractmethod
- def mark_blocks_as_accessed(self, block_ids: List[int],
- now: float) -> None:
- pass
- @abstractmethod
- def mark_blocks_as_computed(self, block_ids: List[int]) -> None:
- pass
- @abstractmethod
- def get_computed_block_ids(self, prev_computed_block_ids: List[int],
- block_ids: List[int],
- skip_last_block_id: bool) -> List[int]:
- pass
- @abstractmethod
- def get_common_computed_block_ids(
- self, computed_seq_block_ids: List[List[int]]) -> List[int]:
- pass
- @abstractmethod
- def cow_block_if_not_appendable(self, block: Block) -> BlockId:
- """NOTE: This should not be used besides Block"""
- pass
- @abstractmethod
- def promote_to_immutable_block(self, block: Block) -> BlockId:
- """NOTE: This should not be used besides Block"""
- pass
- @abstractmethod
- def get_num_blocks_touched(self,
- blocks: List[Block],
- num_lookahead_slots: int = 0) -> int:
- pass
- class NoFreeBlocksError(ValueError):
- pass
- class DeviceAwareBlockAllocator(ABC):
- @abstractmethod
- def allocate_mutable_block(self, prev_block: Optional[Block],
- device: Device) -> Block:
- pass
- @abstractmethod
- def allocate_immutable_block(self, prev_block: Optional[Block],
- token_ids: List[int],
- device: Device) -> Block:
- pass
- @abstractmethod
- def allocate_immutable_blocks(self, prev_block: Optional[Block],
- block_token_ids: List[List[int]],
- device: Device) -> List[Block]:
- pass
- @abstractmethod
- def get_num_free_blocks(self, device: Device) -> int:
- pass
- @abstractmethod
- def get_num_total_blocks(self, device: Device) -> int:
- pass
- @abstractmethod
- def free(self, block: Block) -> None:
- pass
- @abstractmethod
- def fork(self, last_block: Block) -> List[Block]:
- pass
- @property
- @abstractmethod
- def all_block_ids(self) -> FrozenSet[int]:
- pass
- @abstractmethod
- def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
- pass
- @abstractmethod
- def mark_blocks_as_accessed(self, block_ids: List[int],
- now: float) -> None:
- pass
- @abstractmethod
- def mark_blocks_as_computed(self, block_ids: List[int]) -> None:
- pass
- @abstractmethod
- def get_computed_block_ids(self, prev_computed_block_ids: List[int],
- block_ids: List[int],
- skip_last_block_id: bool) -> List[int]:
- pass
- @abstractmethod
- def get_common_computed_block_ids(
- self, computed_seq_block_ids: List[List[int]]) -> List[int]:
- pass
- @abstractmethod
- def get_num_blocks_touched(self,
- blocks: List[Block],
- device: Device,
- num_lookahead_slots: int = 0) -> int:
- pass
- @abstractmethod
- def swap(self, blocks: List[Block], src_device: Device,
- dst_device: Device) -> Dict[int, int]:
- pass
- @abstractmethod
- def get_physical_block_id(self, device: Device, absolute_id: int) -> int:
- pass
- @abstractmethod
- def allocate_or_get_null_block(self) -> Block:
- """
- Null blocks are used as a placeholders for KV cache blocks that have
- been dropped due to sliding window.
- There is at most one null block per allocator.
- """
- pass
|