|
@@ -1,17 +1,16 @@
|
|
import copy
|
|
import copy
|
|
|
|
+from collections import defaultdict
|
|
import os
|
|
import os
|
|
import time
|
|
import time
|
|
-from functools import partial
|
|
|
|
-from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple, Union
|
|
|
|
-
|
|
|
|
-import psutil
|
|
|
|
|
|
+from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple,
|
|
|
|
+ Union)
|
|
|
|
|
|
from aphrodite.common.config import (CacheConfig, ModelConfig, ParallelConfig,
|
|
from aphrodite.common.config import (CacheConfig, ModelConfig, ParallelConfig,
|
|
SchedulerConfig)
|
|
SchedulerConfig)
|
|
from aphrodite.processing.scheduler import Scheduler, SchedulerOutputs
|
|
from aphrodite.processing.scheduler import Scheduler, SchedulerOutputs
|
|
from aphrodite.engine.args_tools import EngineArgs
|
|
from aphrodite.engine.args_tools import EngineArgs
|
|
from aphrodite.engine.metrics import record_metrics
|
|
from aphrodite.engine.metrics import record_metrics
|
|
-from aphrodite.engine.ray_tools import RayWorker, initialize_cluster, ray
|
|
|
|
|
|
+from aphrodite.engine.ray_tools import RayWorkerAphrodite, initialize_cluster, ray
|
|
from aphrodite.common.logger import init_logger
|
|
from aphrodite.common.logger import init_logger
|
|
from aphrodite.common.outputs import RequestOutput
|
|
from aphrodite.common.outputs import RequestOutput
|
|
from aphrodite.common.sampling_params import SamplingParams
|
|
from aphrodite.common.sampling_params import SamplingParams
|
|
@@ -20,10 +19,10 @@ from aphrodite.common.sequence import (SamplerOutput, Sequence, SequenceGroup,
|
|
SequenceStatus)
|
|
SequenceStatus)
|
|
from aphrodite.transformers_utils.tokenizer import (detokenize_incrementally,
|
|
from aphrodite.transformers_utils.tokenizer import (detokenize_incrementally,
|
|
get_tokenizer)
|
|
get_tokenizer)
|
|
-from aphrodite.common.utils import Counter
|
|
|
|
|
|
+from aphrodite.common.utils import (Counter, set_cuda_visible_devices, get_ip,
|
|
|
|
+ get_open_port)
|
|
|
|
|
|
if ray:
|
|
if ray:
|
|
- from ray.air.util.torch_dist import init_torch_dist_process_group
|
|
|
|
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
|
|
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
if TYPE_CHECKING:
|
|
@@ -56,10 +55,8 @@ class AphroditeEngine:
|
|
management.
|
|
management.
|
|
parallel_config: The configuration related to distributed execution.
|
|
parallel_config: The configuration related to distributed execution.
|
|
scheduler_config: The configuration related to the request scheduler.
|
|
scheduler_config: The configuration related to the request scheduler.
|
|
- distributed_init_method: The initialization method for distributed
|
|
|
|
- execution. See `torch.distributed.init_process_group` for details.
|
|
|
|
- stage_devices: The list of devices for each stage. Each stage is a list
|
|
|
|
- of (rank, node_resource, device) tuples.
|
|
|
|
|
|
+ placement_group: Ray placement group for distributed execution.
|
|
|
|
+ Required for distributed execution.
|
|
log_stats: Whether to log statistics.
|
|
log_stats: Whether to log statistics.
|
|
"""
|
|
"""
|
|
|
|
|
|
@@ -69,7 +66,6 @@ class AphroditeEngine:
|
|
cache_config: CacheConfig,
|
|
cache_config: CacheConfig,
|
|
parallel_config: ParallelConfig,
|
|
parallel_config: ParallelConfig,
|
|
scheduler_config: SchedulerConfig,
|
|
scheduler_config: SchedulerConfig,
|
|
- distributed_init_method: str,
|
|
|
|
placement_group: Optional["PlacementGroup"],
|
|
placement_group: Optional["PlacementGroup"],
|
|
log_stats: bool,
|
|
log_stats: bool,
|
|
) -> None:
|
|
) -> None:
|
|
@@ -88,7 +84,6 @@ class AphroditeEngine:
|
|
f"Sampler Seed = {model_config.seed}\n"
|
|
f"Sampler Seed = {model_config.seed}\n"
|
|
f"Context Length = {model_config.max_model_len}\n"
|
|
f"Context Length = {model_config.max_model_len}\n"
|
|
f"Enforce Eager Mode = {model_config.enforce_eager}\n"
|
|
f"Enforce Eager Mode = {model_config.enforce_eager}\n"
|
|
- f"KV Cache DataType = {cache_config.cache_dtype}\n"
|
|
|
|
f"Seed = {model_config.seed}")
|
|
f"Seed = {model_config.seed}")
|
|
# TODO: Print more configs in debug mode.
|
|
# TODO: Print more configs in debug mode.
|
|
|
|
|
|
@@ -103,6 +98,7 @@ class AphroditeEngine:
|
|
model_config.tokenizer,
|
|
model_config.tokenizer,
|
|
tokenizer_mode=model_config.tokenizer_mode,
|
|
tokenizer_mode=model_config.tokenizer_mode,
|
|
trust_remote_code=model_config.trust_remote_code,
|
|
trust_remote_code=model_config.trust_remote_code,
|
|
|
|
+ tokenizer_revision=model_config.tokenizer_revision,
|
|
revision=model_config.revision)
|
|
revision=model_config.revision)
|
|
self.seq_counter = Counter()
|
|
self.seq_counter = Counter()
|
|
|
|
|
|
@@ -114,7 +110,7 @@ class AphroditeEngine:
|
|
os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
|
|
os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
|
|
self._init_workers_ray(placement_group)
|
|
self._init_workers_ray(placement_group)
|
|
else:
|
|
else:
|
|
- self._init_workers(distributed_init_method)
|
|
|
|
|
|
+ self._init_workers()
|
|
|
|
|
|
# Profile the memory usage and initialize the cache.
|
|
# Profile the memory usage and initialize the cache.
|
|
self._init_cache()
|
|
self._init_cache()
|
|
@@ -129,118 +125,169 @@ class AphroditeEngine:
|
|
# List of (timestamp, num_tokens)
|
|
# List of (timestamp, num_tokens)
|
|
self.num_generation_tokens: List[Tuple[float, int]] = []
|
|
self.num_generation_tokens: List[Tuple[float, int]] = []
|
|
|
|
|
|
- def _init_workers(self, distributed_init_method: str):
|
|
|
|
|
|
+ def _init_workers(self):
|
|
# Lazy import the Worker to avoid importing torch.cuda/xformers
|
|
# Lazy import the Worker to avoid importing torch.cuda/xformers
|
|
# before CUDA_VISIBLE_DEVICES is set in the Worker
|
|
# before CUDA_VISIBLE_DEVICES is set in the Worker
|
|
- from aphrodite.task_handler.worker import Worker # pylint: disable=import-outside-toplevel
|
|
|
|
|
|
+ # pylint: disable=import-outside-toplevel
|
|
|
|
+ from aphrodite.task_handler.worker import Worker
|
|
|
|
|
|
assert self.parallel_config.world_size == 1, (
|
|
assert self.parallel_config.world_size == 1, (
|
|
"Ray is required if parallel_config.world_size > 1.")
|
|
"Ray is required if parallel_config.world_size > 1.")
|
|
|
|
|
|
self.workers: List[Worker] = []
|
|
self.workers: List[Worker] = []
|
|
- worker = Worker(
|
|
|
|
|
|
+ distributed_init_method = f"tcp://{get_ip()}:{get_open_port()}"
|
|
|
|
+ self.driver_worker = Worker(
|
|
self.model_config,
|
|
self.model_config,
|
|
self.parallel_config,
|
|
self.parallel_config,
|
|
self.scheduler_config,
|
|
self.scheduler_config,
|
|
- 0,
|
|
|
|
- distributed_init_method,
|
|
|
|
- )
|
|
|
|
- self.workers.append(worker)
|
|
|
|
- self._run_workers(
|
|
|
|
- "init_model",
|
|
|
|
- get_all_outputs=True,
|
|
|
|
- )
|
|
|
|
- self._run_workers(
|
|
|
|
- "load_model",
|
|
|
|
- get_all_outputs=True,
|
|
|
|
- max_concurrent_workers=self.parallel_config.
|
|
|
|
- max_parallel_loading_workers,
|
|
|
|
|
|
+ local_rank=0,
|
|
|
|
+ rank=0,
|
|
|
|
+ distributed_init_method=distributed_init_method,
|
|
|
|
+ is_driver_worker=True,
|
|
)
|
|
)
|
|
|
|
+ self._run_workers("init_model")
|
|
|
|
+ self._run_workers("load_model")
|
|
|
|
|
|
def _init_workers_ray(self, placement_group: "PlacementGroup",
|
|
def _init_workers_ray(self, placement_group: "PlacementGroup",
|
|
**ray_remote_kwargs):
|
|
**ray_remote_kwargs):
|
|
- # Lazy import the Worker to avoid importing torch.cuda/xformers
|
|
|
|
- # before CUDA_VISIBLE_DEVICES is set in the Worker
|
|
|
|
- from aphrodite.task_handler.worker import Worker # pylint: disable=import-outside-toplevel
|
|
|
|
|
|
+ if self.parallel_config.tensor_parallel_size == 1:
|
|
|
|
+ num_gpus = self.cache_config.gpu_memory_utilization
|
|
|
|
+ else:
|
|
|
|
+ num_gpus = 1
|
|
|
|
|
|
- self.workers: List[Worker] = []
|
|
|
|
- for bundle in placement_group.bundle_specs:
|
|
|
|
|
|
+ self.driver_dummy_worker: RayWorkerAphrodite = None
|
|
|
|
+ self.workers: List[RayWorkerAphrodite] = []
|
|
|
|
+
|
|
|
|
+ driver_ip = get_ip()
|
|
|
|
+ for bundle_id, bundle in enumerate(placement_group.bundle_specs):
|
|
if not bundle.get("GPU", 0):
|
|
if not bundle.get("GPU", 0):
|
|
continue
|
|
continue
|
|
- if self.parallel_config.tensor_parallel_size == 1:
|
|
|
|
- num_gpus = self.cache_config.gpu_memory_utilization
|
|
|
|
- else:
|
|
|
|
- num_gpus = 1
|
|
|
|
|
|
+ scheduling_strategy = PlacementGroupSchedulingStrategy(
|
|
|
|
+ placement_group=placement_group,
|
|
|
|
+ placement_group_capture_child_tasks=True,
|
|
|
|
+ placement_group_bundle_index=bundle_id,
|
|
|
|
+ )
|
|
worker = ray.remote(
|
|
worker = ray.remote(
|
|
num_cpus=0,
|
|
num_cpus=0,
|
|
num_gpus=num_gpus,
|
|
num_gpus=num_gpus,
|
|
- scheduling_strategy=PlacementGroupSchedulingStrategy(
|
|
|
|
- placement_group=placement_group,
|
|
|
|
- placement_group_capture_child_tasks=True),
|
|
|
|
|
|
+ scheduling_strategy=scheduling_strategy,
|
|
**ray_remote_kwargs,
|
|
**ray_remote_kwargs,
|
|
- )(RayWorker).remote(self.model_config.trust_remote_code)
|
|
|
|
- self.workers.append(worker)
|
|
|
|
|
|
+ )(RayWorkerAphrodite).remote(self.model_config.trust_remote_code)
|
|
|
|
+
|
|
|
|
+ worker_ip = ray.get(worker.get_node_ip.remote())
|
|
|
|
+ if worker_ip == driver_ip and self.driver_dummy_worker is None:
|
|
|
|
+ # If the worker is on the same node as the driver, we use it
|
|
|
|
+ # as the resource holder for the driver process.
|
|
|
|
+ self.driver_dummy_worker = worker
|
|
|
|
+ else:
|
|
|
|
+ self.workers.append(worker)
|
|
|
|
+
|
|
|
|
+ if self.driver_dummy_worker is None:
|
|
|
|
+ raise ValueError(
|
|
|
|
+ "Ray does not allocate any GPUs on the driver node. Consider "
|
|
|
|
+ "adjusting the Ray placement group or running the driver on a "
|
|
|
|
+ "GPU node.")
|
|
|
|
+
|
|
|
|
+ driver_node_id, driver_gpu_ids = ray.get(
|
|
|
|
+ self.driver_dummy_worker.get_node_and_gpu_ids.remote())
|
|
|
|
+ worker_node_and_gpu_ids = ray.get(
|
|
|
|
+ [worker.get_node_and_gpu_ids.remote() for worker in self.workers])
|
|
|
|
+
|
|
|
|
+ node_workers = defaultdict(list)
|
|
|
|
+ node_gpus = defaultdict(list)
|
|
|
|
+
|
|
|
|
+ node_workers[driver_node_id].append(0)
|
|
|
|
+ node_gpus[driver_node_id].extend(driver_gpu_ids)
|
|
|
|
+ for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids,
|
|
|
|
+ start=1):
|
|
|
|
+ node_workers[node_id].append(i)
|
|
|
|
+ node_gpus[node_id].extend(gpu_ids)
|
|
|
|
+ for node_id, gpu_ids in node_gpus.items():
|
|
|
|
+ node_gpus[node_id] = sorted(gpu_ids)
|
|
|
|
+
|
|
|
|
+ # Set CUDA_VISIBLE_DEVICES for the driver.
|
|
|
|
+ set_cuda_visible_devices(node_gpus[driver_node_id])
|
|
|
|
+ for worker, (node_id, _) in zip(self.workers, worker_node_and_gpu_ids):
|
|
|
|
+ worker.set_cuda_visible_devices.remote(node_gpus[node_id])
|
|
|
|
+
|
|
|
|
+ distributed_init_method = f"tcp://{driver_ip}:{get_open_port()}"
|
|
|
|
+
|
|
|
|
+ # Lazy import the Worker to avoid importing torch.cuda/xformers
|
|
|
|
+ # before CUDA_VISIBLE_DEVICES is set in the Worker
|
|
|
|
+ # pylint: disable=import-outside-toplevel
|
|
|
|
+ from aphrodite.task_handler.worker import Worker
|
|
|
|
|
|
# Initialize torch distributed process group for the workers.
|
|
# Initialize torch distributed process group for the workers.
|
|
- init_torch_dist_process_group(self.workers, backend="nccl")
|
|
|
|
model_config = copy.deepcopy(self.model_config)
|
|
model_config = copy.deepcopy(self.model_config)
|
|
parallel_config = copy.deepcopy(self.parallel_config)
|
|
parallel_config = copy.deepcopy(self.parallel_config)
|
|
scheduler_config = copy.deepcopy(self.scheduler_config)
|
|
scheduler_config = copy.deepcopy(self.scheduler_config)
|
|
- self._run_workers("init_worker",
|
|
|
|
- get_all_outputs=True,
|
|
|
|
- worker_init_fn=lambda: Worker(
|
|
|
|
- model_config,
|
|
|
|
- parallel_config,
|
|
|
|
- scheduler_config,
|
|
|
|
- None,
|
|
|
|
- None,
|
|
|
|
- ))
|
|
|
|
- self._run_workers(
|
|
|
|
- "init_model",
|
|
|
|
- get_all_outputs=True,
|
|
|
|
|
|
+
|
|
|
|
+ for rank, (worker, (node_id,
|
|
|
|
+ _)) in enumerate(zip(self.workers,
|
|
|
|
+ worker_node_and_gpu_ids),
|
|
|
|
+ start=1):
|
|
|
|
+ local_rank = node_workers[node_id].index(rank)
|
|
|
|
+ worker.init_worker.remote(
|
|
|
|
+ lambda rank=rank, local_rank=local_rank: Worker(
|
|
|
|
+ model_config,
|
|
|
|
+ parallel_config,
|
|
|
|
+ scheduler_config,
|
|
|
|
+ local_rank,
|
|
|
|
+ rank,
|
|
|
|
+ distributed_init_method,
|
|
|
|
+ ))
|
|
|
|
+
|
|
|
|
+ driver_rank = 0
|
|
|
|
+ driver_local_rank = node_workers[driver_node_id].index(driver_rank)
|
|
|
|
+ self.driver_worker = Worker(
|
|
|
|
+ model_config,
|
|
|
|
+ parallel_config,
|
|
|
|
+ scheduler_config,
|
|
|
|
+ driver_local_rank,
|
|
|
|
+ driver_rank,
|
|
|
|
+ distributed_init_method,
|
|
|
|
+ is_driver_worker=True,
|
|
)
|
|
)
|
|
|
|
+
|
|
|
|
+ self._run_workers("init_model")
|
|
self._run_workers(
|
|
self._run_workers(
|
|
"load_model",
|
|
"load_model",
|
|
- get_all_outputs=True,
|
|
|
|
max_concurrent_workers=self.parallel_config.
|
|
max_concurrent_workers=self.parallel_config.
|
|
max_parallel_loading_workers,
|
|
max_parallel_loading_workers,
|
|
)
|
|
)
|
|
|
|
|
|
- # HACK
|
|
|
|
- # After running ray.init(), ray processes affinity is set to (0,1).
|
|
|
|
- # (or whatever the CPU scheduler fancies)
|
|
|
|
- # We however want the actual workers that are being used,
|
|
|
|
- # so we call here since calling after ray.init() and everything else.
|
|
|
|
- # We reassign each ray process by taking the
|
|
|
|
- # modulus of the number of cpu_cores available.
|
|
|
|
- # Issue: https://github.com/PygmalionAI/aphrodite-engine/issues/115
|
|
|
|
- # The solution is similar to the taskset solution linked above.
|
|
|
|
- current_process = psutil.Process()
|
|
|
|
- ray_threads = 0
|
|
|
|
- logical_cores = psutil.cpu_count(logical=True)
|
|
|
|
- physical_cores = psutil.cpu_count(logical=False)
|
|
|
|
- ht_scale = physical_cores / logical_cores
|
|
|
|
- for process in current_process.children(recursive=True):
|
|
|
|
- # process.pid
|
|
|
|
- if "ray::" in process.name():
|
|
|
|
- process.cpu_affinity([ray_threads])
|
|
|
|
- ray_threads += int(1 * ht_scale) if ht_scale > 1.0 else 1
|
|
|
|
- ray_threads = ray_threads % logical_cores
|
|
|
|
-
|
|
|
|
def _verify_args(self) -> None:
|
|
def _verify_args(self) -> None:
|
|
self.model_config.verify_with_parallel_config(self.parallel_config)
|
|
self.model_config.verify_with_parallel_config(self.parallel_config)
|
|
self.cache_config.verify_with_parallel_config(self.parallel_config)
|
|
self.cache_config.verify_with_parallel_config(self.parallel_config)
|
|
|
|
|
|
def _init_cache(self) -> None:
|
|
def _init_cache(self) -> None:
|
|
- """Profiles the memory usage and initializes the KV cache."""
|
|
|
|
|
|
+ """Profiles the memory usage and initializes the KV cache.
|
|
|
|
+
|
|
|
|
+ The engine will first conduct a profiling of the existing memory usage.
|
|
|
|
+ Then, it calculate the maximum possible number of GPU and CPU blocks
|
|
|
|
+ that can be allocated with the remaining free memory.
|
|
|
|
+ More details can be found in the
|
|
|
|
+ # pylint: disable=line-too-long
|
|
|
|
+ :meth:`~aphrodite.task_handler.worker.Worker.profile_num_available_blocks` method
|
|
|
|
+ from class :class:`~aphrodite.task_handler.Worker`.
|
|
|
|
+
|
|
|
|
+ Afterwards, as there may be multiple workers,
|
|
|
|
+ we take the minimum number of blocks across all workers
|
|
|
|
+ to ensure this can be applied to all of them.
|
|
|
|
+
|
|
|
|
+ Finally, the engine will initialize the KV cache
|
|
|
|
+ with the calculated number of blocks.
|
|
|
|
+
|
|
|
|
+ .. tip::
|
|
|
|
+ You may limit the usage of GPU memory
|
|
|
|
+ by adjusting the `gpu_memory_utilization` parameters.
|
|
|
|
+ """
|
|
# Get the maximum number of blocks that can be allocated on GPU and CPU.
|
|
# Get the maximum number of blocks that can be allocated on GPU and CPU.
|
|
num_blocks = self._run_workers(
|
|
num_blocks = self._run_workers(
|
|
"profile_num_available_blocks",
|
|
"profile_num_available_blocks",
|
|
- get_all_outputs=True,
|
|
|
|
block_size=self.cache_config.block_size,
|
|
block_size=self.cache_config.block_size,
|
|
gpu_memory_utilization=self.cache_config.gpu_memory_utilization,
|
|
gpu_memory_utilization=self.cache_config.gpu_memory_utilization,
|
|
cpu_swap_space=self.cache_config.swap_space_bytes,
|
|
cpu_swap_space=self.cache_config.swap_space_bytes,
|
|
- cache_dtype=self.cache_config.cache_dtype,
|
|
|
|
)
|
|
)
|
|
|
|
|
|
# Since we use a shared centralized controller, we take the minimum
|
|
# Since we use a shared centralized controller, we take the minimum
|
|
@@ -256,7 +303,6 @@ class AphroditeEngine:
|
|
raise ValueError("No available memory for the cache blocks. "
|
|
raise ValueError("No available memory for the cache blocks. "
|
|
"Try increasing `gpu_memory_utilization` when "
|
|
"Try increasing `gpu_memory_utilization` when "
|
|
"initializing the engine.")
|
|
"initializing the engine.")
|
|
-
|
|
|
|
max_seq_len = self.cache_config.block_size * num_gpu_blocks
|
|
max_seq_len = self.cache_config.block_size * num_gpu_blocks
|
|
if self.model_config.max_model_len > max_seq_len:
|
|
if self.model_config.max_model_len > max_seq_len:
|
|
raise ValueError(
|
|
raise ValueError(
|
|
@@ -272,7 +318,7 @@ class AphroditeEngine:
|
|
# Initialize the cache.
|
|
# Initialize the cache.
|
|
self._run_workers("init_cache_engine", cache_config=self.cache_config)
|
|
self._run_workers("init_cache_engine", cache_config=self.cache_config)
|
|
# Warm up the model. This includes capturing the model into CUDA graph
|
|
# Warm up the model. This includes capturing the model into CUDA graph
|
|
- # if enforce_eager is set to False.
|
|
|
|
|
|
+ # if enforce_eager is False.
|
|
self._run_workers("warm_up_model")
|
|
self._run_workers("warm_up_model")
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
@@ -282,11 +328,9 @@ class AphroditeEngine:
|
|
engine_configs = engine_args.create_engine_configs()
|
|
engine_configs = engine_args.create_engine_configs()
|
|
parallel_config = engine_configs[2]
|
|
parallel_config = engine_configs[2]
|
|
# Initialize the cluster.
|
|
# Initialize the cluster.
|
|
- distributed_init_method, placement_group = initialize_cluster(
|
|
|
|
- parallel_config)
|
|
|
|
|
|
+ placement_group = initialize_cluster(parallel_config)
|
|
# Create the LLM engine.
|
|
# Create the LLM engine.
|
|
engine = cls(*engine_configs,
|
|
engine = cls(*engine_configs,
|
|
- distributed_init_method,
|
|
|
|
placement_group,
|
|
placement_group,
|
|
log_stats=not engine_args.disable_log_stats)
|
|
log_stats=not engine_args.disable_log_stats)
|
|
return engine
|
|
return engine
|
|
@@ -298,6 +342,7 @@ class AphroditeEngine:
|
|
sampling_params: SamplingParams,
|
|
sampling_params: SamplingParams,
|
|
prompt_token_ids: Optional[List[int]] = None,
|
|
prompt_token_ids: Optional[List[int]] = None,
|
|
arrival_time: Optional[float] = None,
|
|
arrival_time: Optional[float] = None,
|
|
|
|
+ prefix_pos: Optional[int] = None,
|
|
) -> None:
|
|
) -> None:
|
|
"""Add a request to the engine's request pool.
|
|
"""Add a request to the engine's request pool.
|
|
|
|
|
|
@@ -313,10 +358,39 @@ class AphroditeEngine:
|
|
prompt_token_ids: The token IDs of the prompt. If None, we
|
|
prompt_token_ids: The token IDs of the prompt. If None, we
|
|
use the tokenizer to convert the prompts to token IDs.
|
|
use the tokenizer to convert the prompts to token IDs.
|
|
arrival_time: The arrival time of the request. If None, we use
|
|
arrival_time: The arrival time of the request. If None, we use
|
|
- the current time.
|
|
|
|
|
|
+ the current monotonic time.
|
|
|
|
+ prefix_pos: If not None, we use the given position as the prefix
|
|
|
|
+ position for each prompt. We will cache the prefix's KV
|
|
|
|
+ cache and reuse it for the next request with the same prefix.
|
|
|
|
+ This is an experimental feature, and may be replaced with
|
|
|
|
+ automatic prefix caching in the future.
|
|
|
|
+
|
|
|
|
+ Details:
|
|
|
|
+ - Set arrival_time to the current time if it is None.
|
|
|
|
+ - Set prompt_token_ids to the encoded prompt if it is None.
|
|
|
|
+ - Create `best_of` number of :class:`~aphrodite.Sequence` objects.
|
|
|
|
+ - Create a :class:`~aphrodite.SequenceGroup` object
|
|
|
|
+ from the list of :class:`~aphrodite.Sequence`.
|
|
|
|
+ - Add the :class:`~aphrodite.SequenceGroup` object to the scheduler.
|
|
|
|
+
|
|
|
|
+ Example:
|
|
|
|
+ >>> # initialize engine
|
|
|
|
+ >>> engine = AphroditeEngine.from_engine_args(engine_args)
|
|
|
|
+ >>> # set request arguments
|
|
|
|
+ >>> example_prompt = "Who is the president of the United States?"
|
|
|
|
+ >>> sampling_params = SamplingParams(temperature=0.0)
|
|
|
|
+ >>> request_id = 0
|
|
|
|
+ >>>
|
|
|
|
+ >>> # add the request to the engine
|
|
|
|
+ >>> engine.add_request(
|
|
|
|
+ >>> str(request_id),
|
|
|
|
+ >>> example_prompt,
|
|
|
|
+ >>> SamplingParams(temperature=0.0))
|
|
|
|
+ >>> # continue the request processing
|
|
|
|
+ >>> ...
|
|
"""
|
|
"""
|
|
if arrival_time is None:
|
|
if arrival_time is None:
|
|
- arrival_time = time.time()
|
|
|
|
|
|
+ arrival_time = time.monotonic()
|
|
if prompt_token_ids is None:
|
|
if prompt_token_ids is None:
|
|
assert prompt is not None
|
|
assert prompt is not None
|
|
prompt_token_ids = self.tokenizer.encode(prompt)
|
|
prompt_token_ids = self.tokenizer.encode(prompt)
|
|
@@ -326,9 +400,13 @@ class AphroditeEngine:
|
|
seq_id = next(self.seq_counter)
|
|
seq_id = next(self.seq_counter)
|
|
seq = Sequence(seq_id, prompt, prompt_token_ids, block_size)
|
|
seq = Sequence(seq_id, prompt, prompt_token_ids, block_size)
|
|
|
|
|
|
|
|
+ # Check whether the input specifies prefix
|
|
|
|
+ prefix = self.scheduler.prefix_pool.add_or_get_prefix(
|
|
|
|
+ prompt_token_ids[:prefix_pos]) if prefix_pos is not None else None
|
|
|
|
+
|
|
# Create the sequence group.
|
|
# Create the sequence group.
|
|
seq_group = SequenceGroup(request_id, [seq], sampling_params,
|
|
seq_group = SequenceGroup(request_id, [seq], sampling_params,
|
|
- arrival_time)
|
|
|
|
|
|
+ arrival_time, prefix)
|
|
|
|
|
|
# Add the sequence group to the scheduler.
|
|
# Add the sequence group to the scheduler.
|
|
self.scheduler.add_seq_group(seq_group)
|
|
self.scheduler.add_seq_group(seq_group)
|
|
@@ -338,6 +416,17 @@ class AphroditeEngine:
|
|
|
|
|
|
Args:
|
|
Args:
|
|
request_id: The ID(s) of the request to abort.
|
|
request_id: The ID(s) of the request to abort.
|
|
|
|
+
|
|
|
|
+ Details:
|
|
|
|
+ - Refer to the
|
|
|
|
+ :meth:`~aphrodite.processing.scheduler.Scheduler.abort_seq_group`
|
|
|
|
+ from class :class:`~aphrodite.processing.scheduler.Scheduler`.
|
|
|
|
+
|
|
|
|
+ Example:
|
|
|
|
+ >>> # initialize engine and add a request with request_id
|
|
|
|
+ >>> request_id = str(0)
|
|
|
|
+ >>> # abort the request
|
|
|
|
+ >>> engine.abort_request(request_id)
|
|
"""
|
|
"""
|
|
self.scheduler.abort_seq_group(request_id)
|
|
self.scheduler.abort_seq_group(request_id)
|
|
|
|
|
|
@@ -583,10 +672,18 @@ class AphroditeEngine:
|
|
|
|
|
|
# Create the outputs.
|
|
# Create the outputs.
|
|
request_outputs: List[RequestOutput] = []
|
|
request_outputs: List[RequestOutput] = []
|
|
- for seq_group in (scheduled_seq_groups +
|
|
|
|
- scheduler_outputs.ignored_seq_groups):
|
|
|
|
|
|
+ for seq_group in scheduled_seq_groups:
|
|
request_output = RequestOutput.from_seq_group(seq_group)
|
|
request_output = RequestOutput.from_seq_group(seq_group)
|
|
request_outputs.append(request_output)
|
|
request_outputs.append(request_output)
|
|
|
|
+ for seq_group in scheduler_outputs.ignored_seq_groups:
|
|
|
|
+ request_output = RequestOutput.from_seq_group(seq_group)
|
|
|
|
+ request_outputs.append(request_output)
|
|
|
|
+
|
|
|
|
+ # Update prefix state, now all the uncomputed prefixes are computed.
|
|
|
|
+ for seq_group in scheduled_seq_groups:
|
|
|
|
+ if (seq_group.prefix is not None and seq_group.prefix.allocated
|
|
|
|
+ and not seq_group.prefix.computed):
|
|
|
|
+ seq_group.prefix.computed = True
|
|
|
|
|
|
if self.log_stats:
|
|
if self.log_stats:
|
|
# Log the system stats.
|
|
# Log the system stats.
|
|
@@ -597,31 +694,83 @@ class AphroditeEngine:
|
|
def step(self) -> List[RequestOutput]:
|
|
def step(self) -> List[RequestOutput]:
|
|
"""Performs one decoding iteration and returns newly generated results.
|
|
"""Performs one decoding iteration and returns newly generated results.
|
|
|
|
|
|
- This function performs one decoding iteration of the engine. It first
|
|
|
|
- schedules the sequences to be executed in the next iteration and the
|
|
|
|
- token blocks to be swapped in/out/copy. Then, it executes the model
|
|
|
|
- and updates the scheduler with the model outputs. Finally, it decodes
|
|
|
|
- the sequences and returns the newly generated results.
|
|
|
|
|
|
+ .. figure:: https://i.imgur.com/sv2HssD.png
|
|
|
|
+ :alt: Overview of the step function
|
|
|
|
+ :align: center
|
|
|
|
+
|
|
|
|
+ Overview of the step function.
|
|
|
|
+
|
|
|
|
+ Details:
|
|
|
|
+ - Step 1: Schedules the sequences to be executed in the next
|
|
|
|
+ iteration and the token blocks to be swapped in/out/copy.
|
|
|
|
+
|
|
|
|
+ - Depending on the scheduling policy,
|
|
|
|
+ sequences may be `preempted/reordered`.
|
|
|
|
+ - A Sequence Group (SG) refer to a group of sequences
|
|
|
|
+ that are generated from the same prompt.
|
|
|
|
+
|
|
|
|
+ - Step 2: Calls the workers to execute the model.
|
|
|
|
+ - Step 3: Processes the model output. This mainly includes:
|
|
|
|
+
|
|
|
|
+ - Decodes the relevant outputs.
|
|
|
|
+ - Updates the scheduled sequence groups with model outputs
|
|
|
|
+ based on its `sampling parameters` (`use_beam_search` or not).
|
|
|
|
+ - Frees the finished sequence groups.
|
|
|
|
+
|
|
|
|
+ - Finally, it creates and returns the newly generated results.
|
|
|
|
+
|
|
|
|
+ Example:
|
|
|
|
+ >>> # Please see the example/ folder for more detailed examples.
|
|
|
|
+ >>>
|
|
|
|
+ >>> # initialize engine and request arguments
|
|
|
|
+ >>> engine = AphroditeEngine.from_engine_args(engine_args)
|
|
|
|
+ >>> example_inputs = [(0, "What is LLM?",
|
|
|
|
+ >>> SamplingParams(temperature=0.0))]
|
|
|
|
+ >>>
|
|
|
|
+ >>> # Start the engine with an event loop
|
|
|
|
+ >>> while True:
|
|
|
|
+ >>> if example_inputs:
|
|
|
|
+ >>> req_id, prompt, sampling_params = example_inputs.pop(0)
|
|
|
|
+ >>> engine.add_request(str(req_id), prompt, sampling_params)
|
|
|
|
+ >>>
|
|
|
|
+ >>> # continue the request processing
|
|
|
|
+ >>> request_outputs = engine.step()
|
|
|
|
+ >>> for request_output in request_outputs:
|
|
|
|
+ >>> if request_output.finished:
|
|
|
|
+ >>> # return or show the request output
|
|
|
|
+ >>>
|
|
|
|
+ >>> if not (engine.has_unfinished_requests() or example_inputs):
|
|
|
|
+ >>> break
|
|
"""
|
|
"""
|
|
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
|
|
seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
|
|
|
|
|
|
- # Execute the model.
|
|
|
|
- output = self._run_workers(
|
|
|
|
- "execute_model",
|
|
|
|
- seq_group_metadata_list=seq_group_metadata_list,
|
|
|
|
- blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
|
|
|
|
- blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
|
|
|
|
- blocks_to_copy=scheduler_outputs.blocks_to_copy,
|
|
|
|
- ) if not scheduler_outputs.is_empty() else []
|
|
|
|
|
|
+ if not scheduler_outputs.is_empty():
|
|
|
|
+ # Execute the model.
|
|
|
|
+ all_outputs = self._run_workers(
|
|
|
|
+ "execute_model",
|
|
|
|
+ driver_kwargs={
|
|
|
|
+ "seq_group_metadata_list": seq_group_metadata_list,
|
|
|
|
+ "blocks_to_swap_in": scheduler_outputs.blocks_to_swap_in,
|
|
|
|
+ "blocks_to_swap_out": scheduler_outputs.blocks_to_swap_out,
|
|
|
|
+ "blocks_to_copy": scheduler_outputs.blocks_to_copy,
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ # Only the driver worker returns the sampling results.
|
|
|
|
+ output = all_outputs[0]
|
|
|
|
+ else:
|
|
|
|
+ output = []
|
|
|
|
|
|
return self._process_model_outputs(output, scheduler_outputs)
|
|
return self._process_model_outputs(output, scheduler_outputs)
|
|
|
|
|
|
|
|
+ def do_log_stats(self) -> None:
|
|
|
|
+ self._log_system_stats(False, 0)
|
|
|
|
+
|
|
def _log_system_stats(
|
|
def _log_system_stats(
|
|
self,
|
|
self,
|
|
prompt_run: bool,
|
|
prompt_run: bool,
|
|
num_batched_tokens: int,
|
|
num_batched_tokens: int,
|
|
) -> None:
|
|
) -> None:
|
|
- now = time.time()
|
|
|
|
|
|
+ now = time.monotonic()
|
|
# Log the number of batched input tokens.
|
|
# Log the number of batched input tokens.
|
|
if prompt_run:
|
|
if prompt_run:
|
|
self.num_prompt_tokens.append((now, num_batched_tokens))
|
|
self.num_prompt_tokens.append((now, num_batched_tokens))
|
|
@@ -699,7 +848,8 @@ class AphroditeEngine:
|
|
prefix_offset=seq.prefix_offset,
|
|
prefix_offset=seq.prefix_offset,
|
|
read_offset=seq.read_offset,
|
|
read_offset=seq.read_offset,
|
|
skip_special_tokens=prms.skip_special_tokens,
|
|
skip_special_tokens=prms.skip_special_tokens,
|
|
- spaces_between_special_tokens=prms.spaces_between_special_tokens)
|
|
|
|
|
|
+ spaces_between_special_tokens=prms.spaces_between_special_tokens,
|
|
|
|
+ )
|
|
if seq.tokens is None:
|
|
if seq.tokens is None:
|
|
seq.tokens = new_tokens
|
|
seq.tokens = new_tokens
|
|
else:
|
|
else:
|
|
@@ -715,7 +865,7 @@ class AphroditeEngine:
|
|
if seq.output_text.endswith(stop_str):
|
|
if seq.output_text.endswith(stop_str):
|
|
if not sampling_params.include_stop_str_in_output:
|
|
if not sampling_params.include_stop_str_in_output:
|
|
# Truncate the output text so that the stop string is
|
|
# Truncate the output text so that the stop string is
|
|
- # not included in the output
|
|
|
|
|
|
+ # not included in the output.
|
|
seq.output_text = seq.output_text[:-len(stop_str)]
|
|
seq.output_text = seq.output_text[:-len(stop_str)]
|
|
seq.status = SequenceStatus.FINISHED_STOPPED
|
|
seq.status = SequenceStatus.FINISHED_STOPPED
|
|
return
|
|
return
|
|
@@ -739,54 +889,38 @@ class AphroditeEngine:
|
|
seq.status = SequenceStatus.FINISHED_STOPPED
|
|
seq.status = SequenceStatus.FINISHED_STOPPED
|
|
return
|
|
return
|
|
|
|
|
|
- def _run_workers_in_batch(
|
|
|
|
- self,
|
|
|
|
- workers,
|
|
|
|
- method: str,
|
|
|
|
- *args,
|
|
|
|
- **kwargs,
|
|
|
|
- ):
|
|
|
|
- all_outputs = []
|
|
|
|
- for worker in workers:
|
|
|
|
- if self.parallel_config.worker_use_ray:
|
|
|
|
- executor = partial(worker.execute_method.remote, method)
|
|
|
|
- else:
|
|
|
|
- executor = getattr(worker, method)
|
|
|
|
-
|
|
|
|
- output = executor(*args, **kwargs)
|
|
|
|
- all_outputs.append(output)
|
|
|
|
-
|
|
|
|
- if self.parallel_config.worker_use_ray:
|
|
|
|
- all_outputs = ray.get(all_outputs)
|
|
|
|
- return all_outputs
|
|
|
|
-
|
|
|
|
def _run_workers(
|
|
def _run_workers(
|
|
self,
|
|
self,
|
|
method: str,
|
|
method: str,
|
|
*args,
|
|
*args,
|
|
- get_all_outputs: bool = False,
|
|
|
|
|
|
+ driver_args: Optional[List[Any]] = None,
|
|
|
|
+ driver_kwargs: Optional[Dict[str, Any]] = None,
|
|
max_concurrent_workers: Optional[int] = None,
|
|
max_concurrent_workers: Optional[int] = None,
|
|
**kwargs,
|
|
**kwargs,
|
|
) -> Any:
|
|
) -> Any:
|
|
- """Runs a method on all workers."""
|
|
|
|
- all_outputs = []
|
|
|
|
|
|
+ """Runs the given method on all workers."""
|
|
|
|
+
|
|
if max_concurrent_workers:
|
|
if max_concurrent_workers:
|
|
- work_groups = [
|
|
|
|
- self.workers[i:i + max_concurrent_workers]
|
|
|
|
- for i in range(0, len(self.workers), max_concurrent_workers)
|
|
|
|
- ]
|
|
|
|
- else:
|
|
|
|
- work_groups = [self.workers]
|
|
|
|
|
|
+ raise NotImplementedError(
|
|
|
|
+ "max_concurrent_workers is not supported yet.")
|
|
|
|
+
|
|
|
|
+ # Start the ray workers first.
|
|
|
|
+ ray_worker_outputs = [
|
|
|
|
+ worker.execute_method.remote(method, *args, **kwargs)
|
|
|
|
+ for worker in self.workers
|
|
|
|
+ ]
|
|
|
|
+
|
|
|
|
+ if driver_args is None:
|
|
|
|
+ driver_args = args
|
|
|
|
+ if driver_kwargs is None:
|
|
|
|
+ driver_kwargs = kwargs
|
|
|
|
|
|
- for workers in work_groups:
|
|
|
|
- all_outputs.extend(
|
|
|
|
- self._run_workers_in_batch(workers, method, *args, **kwargs))
|
|
|
|
|
|
+ # Start the driver worker after all the ray workers.
|
|
|
|
+ driver_worker_output = getattr(self.driver_worker,
|
|
|
|
+ method)(*driver_args, **driver_kwargs)
|
|
|
|
|
|
- if get_all_outputs:
|
|
|
|
- return all_outputs
|
|
|
|
|
|
+ # Get the results of the ray workers.
|
|
|
|
+ if self.workers:
|
|
|
|
+ ray_worker_outputs = ray.get(ray_worker_outputs)
|
|
|
|
|
|
- # Make sure all workers have the same results.
|
|
|
|
- output = all_outputs[0]
|
|
|
|
- for other_output in all_outputs[1:]:
|
|
|
|
- assert output == other_output
|
|
|
|
- return output
|
|
|
|
|
|
+ return [driver_worker_output] + ray_worker_outputs
|