浏览代码

cow => dict[int, list] -> list

AlpinDale 8 月之前
父节点
当前提交
148aca8ff1

+ 10 - 13
aphrodite/processing/block/common.py

@@ -1,5 +1,4 @@
-from collections import defaultdict
-from typing import Dict, Iterable, List, Optional, Protocol
+from typing import Dict, Iterable, List, Optional, Protocol, Tuple
 
 from aphrodite.processing.block.interfaces import Block, BlockAllocator
 
@@ -111,7 +110,7 @@ class CopyOnWriteTracker:
         refcounter: RefCounterProtocol,
         allocator: BlockAllocator,
     ):
-        self._copy_on_writes: Dict[BlockId, List[BlockId]] = defaultdict(list)
+        self._copy_on_writes: List[Tuple[BlockId, BlockId]] = []
         self._refcounter = refcounter
         self._allocator = allocator
 
@@ -152,25 +151,23 @@ class CopyOnWriteTracker:
             # Track src/dst copy.
             assert src_block_id is not None
             assert block_id is not None
-            self._copy_on_writes[src_block_id].append(block_id)
+            self._copy_on_writes.append((src_block_id, block_id))
 
         return block_id
 
-    def clear_cows(self) -> Dict[BlockId, List[BlockId]]:
+    def clear_cows(self) -> List[Tuple[BlockId, BlockId]]:
         """Clears the copy-on-write tracking information and returns the current
         state.
-
-        This method returns a dictionary mapping source block indices to lists
-        of destination block indices for the current copy-on-write operations.
+        This method returns a list mapping source block indices to
+         destination block indices for the current copy-on-write operations.
         It then clears the internal tracking information.
-
         Returns:
-            Dict[BlockId, List[BlockId]]: A dictionary mapping source
-                block indices to lists of destination block indices for the
+            List[Tuple[BlockId, BlockId]]: A list mapping source
+                block indices to destination block indices for the
                 current copy-on-write operations.
         """
-        cows = dict(self._copy_on_writes)
-        self._copy_on_writes.clear()
+        cows = self._copy_on_writes
+        self._copy_on_writes = []
         return cows
 
 

+ 4 - 4
aphrodite/processing/block/cpu_gpu_block_allocator.py

@@ -1,4 +1,4 @@
-from typing import Dict, FrozenSet, List, Optional
+from typing import Dict, FrozenSet, List, Optional, Tuple
 
 from aphrodite.common.utils import Device
 from aphrodite.processing.block.interfaces import (Block, BlockAllocator,
@@ -188,13 +188,13 @@ class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
     def get_num_total_blocks(self, device: Device) -> int:
         return self._allocators[device].get_num_total_blocks()
 
-    def clear_copy_on_writes(self) -> Dict[int, List[int]]:
+    def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
         """Clears the copy-on-write (CoW) state and returns the mapping of
             source to destination block IDs.
 
         Returns:
-            Dict[int, List[int]]: A dictionary mapping source block IDs to lists
-                of destination block IDs.
+            List[Tuple[int, int]]: A list mapping source block IDs to 
+                destination block IDs.
         """
         # CoW only supported on GPU
         device = Device.GPU

+ 3 - 3
aphrodite/processing/block/interfaces.py

@@ -1,5 +1,5 @@
 from abc import ABC, abstractmethod
-from typing import Dict, FrozenSet, List, Optional, Protocol
+from typing import FrozenSet, List, Optional, Protocol, Tuple
 
 from aphrodite.common.utils import Device
 
@@ -122,7 +122,7 @@ class BlockAllocator(ABC):
         pass
 
     @abstractmethod
-    def clear_copy_on_writes(self) -> Dict[int, List[int]]:
+    def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
         pass
 
     @abstractmethod
@@ -187,7 +187,7 @@ class DeviceAwareBlockAllocator(ABC):
         pass
 
     @abstractmethod
-    def clear_copy_on_writes(self) -> Dict[int, List[int]]:
+    def clear_copy_on_writes(self) -> List[Tuple[int, int]]:
         pass
 
     @abstractmethod

+ 4 - 4
aphrodite/processing/block/naive_block.py

@@ -1,4 +1,4 @@
-from typing import Dict, FrozenSet, Iterable, List, Optional, Set
+from typing import FrozenSet, Iterable, List, Optional, Set, Tuple
 
 from aphrodite.processing.block.common import (CopyOnWriteTracker, RefCounter,
                                                get_all_blocks_recursively)
@@ -176,12 +176,12 @@ class NaiveBlockAllocator(BlockAllocator):
         """
         return self._cow_tracker.cow_block_if_not_appendable(block)
 
-    def clear_copy_on_writes(self) -> Dict[BlockId, List[BlockId]]:
+    def clear_copy_on_writes(self) -> List[Tuple[BlockId, BlockId]]:
         """Returns the copy-on-write source->destination mapping and clears it.
 
         Returns:
-            Dict[BlockId, List[BlockId]]: A dictionary mapping source
-                block indices to lists of destination block indices.
+            List[Tuple[BlockId, BlockId]]: A list mapping source
+                block indices to destination block indices.
         """
         return self._cow_tracker.clear_cows()
 

+ 4 - 4
aphrodite/processing/block/prefix_caching_block.py

@@ -1,7 +1,7 @@
 """Token blocks."""
 from itertools import takewhile
 from os.path import commonprefix
-from typing import Dict, FrozenSet, Iterable, List, Optional
+from typing import Dict, FrozenSet, Iterable, List, Optional, Tuple
 
 from aphrodite.processing.block.common import (CopyOnWriteTracker,
                                                get_all_blocks_recursively)
@@ -340,12 +340,12 @@ class PrefixCachingBlockAllocator(BlockAllocator):
         """
         return self._cow_tracker.cow_block_if_not_appendable(block)
 
-    def clear_copy_on_writes(self) -> Dict[BlockId, List[BlockId]]:
+    def clear_copy_on_writes(self) -> List[Tuple[BlockId, BlockId]]:
         """Returns the copy-on-write source->destination mapping and clears it.
 
         Returns:
-            Dict[BlockId, List[BlockId]]: A dictionary mapping source
-                block indices to lists of destination block indices.
+            List[Tuple[BlockId, BlockId]]: A list mapping source
+                block indices to destination block indices.
         """
         return self._cow_tracker.clear_cows()
 

+ 5 - 5
aphrodite/processing/block_manager_v1.py

@@ -5,7 +5,7 @@ from itertools import count, takewhile
 from os.path import commonprefix
 from typing import Dict, List, Optional
 from typing import Sequence as GenericSequence
-from typing import Set
+from typing import Set, Tuple
 
 from loguru import logger
 
@@ -386,7 +386,7 @@ class BlockSpaceManagerV1(BlockSpaceManager):
         self,
         seq: Sequence,
         num_lookahead_slots: int = 0,
-    ) -> Dict[int, List[int]]:
+    ) -> List[Tuple[int, int]]:
         """Allocate a physical slot for a new token."""
         logical_blocks = seq.logical_token_blocks
         block_table = self.block_tables[seq.seq_id]
@@ -405,7 +405,7 @@ class BlockSpaceManagerV1(BlockSpaceManager):
                 # Allocate a new physical block.
                 new_block = self._allocate_last_physical_block(seq)
                 block_table.append(new_block)
-                return {}
+                return []
 
         # We want to append the token to the last physical block.
         last_block = block_table[-1]
@@ -418,7 +418,7 @@ class BlockSpaceManagerV1(BlockSpaceManager):
                 maybe_new_block = self._maybe_promote_last_block(
                     seq, last_block)
                 block_table[-1] = maybe_new_block
-            return {}
+            return []
         else:
             # The last block is shared with other sequences.
             # Copy on Write: Allocate a new block and copy the tokens.
@@ -426,7 +426,7 @@ class BlockSpaceManagerV1(BlockSpaceManager):
 
             block_table[-1] = new_block
             self.gpu_allocator.free(last_block)
-            return {last_block.block_number: [new_block.block_number]}
+            return [(last_block.block_number, new_block.block_number)]
 
     def fork(self, parent_seq: Sequence, child_seq: Sequence) -> None:
         # NOTE: fork does not allocate a new physical block.

+ 2 - 1
aphrodite/processing/block_manager_v2.py

@@ -1,6 +1,7 @@
 """A block manager that manages token blocks."""
 from typing import Dict, List, Optional
 from typing import Sequence as GenericSequence
+from typing import Tuple
 
 from aphrodite.common.sequence import Sequence, SequenceGroup, SequenceStatus
 from aphrodite.common.utils import Device
@@ -163,7 +164,7 @@ class BlockSpaceManagerV2(BlockSpaceManager):
         self,
         seq: Sequence,
         num_lookahead_slots: int,
-    ) -> Dict[int, List[int]]:
+    ) -> List[Tuple[int, int]]:
 
         block_table = self.block_tables[seq.seq_id]
 

+ 2 - 1
aphrodite/processing/interfaces.py

@@ -2,6 +2,7 @@ import enum
 from abc import ABC, abstractmethod
 from typing import Dict, List
 from typing import Sequence as GenericSequence
+from typing import Tuple
 
 from aphrodite.common.sequence import Sequence, SequenceGroup
 
@@ -56,7 +57,7 @@ class BlockSpaceManager(ABC):
         self,
         seq: Sequence,
         num_lookahead_slots: int,
-    ) -> Dict[int, List[int]]:
+    ) -> List[Tuple[int, int]]:
         pass
 
     @abstractmethod

+ 1 - 4
aphrodite/processing/scheduler.py

@@ -1025,10 +1025,7 @@ class Scheduler:
 
         for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
             cows = self.block_manager.append_slots(seq, num_lookahead_slots)
-
-            for src, dests in cows.items():
-                for dest in dests:
-                    blocks_to_copy.append((src, dest))
+            blocks_to_copy.extend(cows)
 
     def _preempt(
         self,