|
- import time
- from typing import Iterable, List, Optional, Type, Union
- from loguru import logger
- from transformers import GenerationConfig, PreTrainedTokenizer
- import aphrodite
- from aphrodite.common.config import (CacheConfig, DecodingConfig, DeviceConfig,
- LoRAConfig, ModelConfig, ParallelConfig,
- SchedulerConfig, SpeculativeConfig,
- VisionLanguageConfig)
- from aphrodite.common.logger import setup_logger
- from aphrodite.common.outputs import RequestOutput
- from aphrodite.common.sampling_params import SamplingParams
- from aphrodite.common.sequence import (MultiModalData, SamplerOutput, Sequence,
- SequenceGroup, SequenceStage)
- from aphrodite.common.utils import Counter
- from aphrodite.engine.args_tools import EngineArgs
- from aphrodite.engine.metrics import StatLogger, Stats
- from aphrodite.engine.output_processor.interfaces import \
- SequenceGroupOutputProcessor
- from aphrodite.engine.output_processor.stop_checker import StopChecker
- from aphrodite.engine.output_processor.util import \
- create_output_by_sequence_group
- from aphrodite.engine.ray_tools import initialize_ray_cluster
- from aphrodite.executor.executor_base import ExecutorBase
- from aphrodite.lora.request import LoRARequest
- from aphrodite.processing.scheduler import Scheduler, SchedulerOutputs
- from aphrodite.transformers_utils.detokenizer import Detokenizer
- from aphrodite.transformers_utils.tokenizer_group import (BaseTokenizerGroup,
- get_tokenizer_group)
- _LOCAL_LOGGING_INTERVAL_SEC = 5
- def _load_generation_config_dict(model_config: ModelConfig):
- try:
- return GenerationConfig.from_pretrained(
- model_config.model,
- revision=model_config.revision,
- ).to_diff_dict()
- except OSError:
- # Not found.
- return {}
- class AphroditeEngine:
- """An LLM engine that receives requests and generates texts.
- This is the main class for the Aphrodite engine. It receives requests
- from clients and generates texts from the LLM. It includes a tokenizer, a
- language model (possibly distributed across multiple GPUs), and GPU memory
- space allocated for intermediate states (aka KV cache). This class utilizes
- iteration-level scheduling and efficient memory management to maximize the
- serving throughput.
- The `LLM` class wraps this class for offline batched inference and the
- `AsyncAphrodite` class wraps this class for online serving.
- NOTE: The config arguments are derived from the `EngineArgs` class. For the
- comprehensive list of arguments, see `EngineArgs`.
- Args:
- model_config: The configuration related to the LLM model.
- cache_config: The configuration related to the KV cache memory
- management.
- parallel_config: The configuration related to distributed execution.
- scheduler_config: The configuration related to the request scheduler.
- device_config: The configuration related to the device.
- lora_config (Optional): The configuration related to serving multi-LoRA.
- vision_language_config (Optional): The configuration related to vision
- language models.
- speculative_config (Optional): The configuration related to speculative
- decoding.
- executor_class: The model executor class for managing distributed
- execution.
- log_stats: Whether to log statistics.
- """
- def __init__(
- self,
- model_config: ModelConfig,
- cache_config: CacheConfig,
- parallel_config: ParallelConfig,
- scheduler_config: SchedulerConfig,
- device_config: DeviceConfig,
- lora_config: Optional[LoRAConfig],
- vision_language_config: Optional[VisionLanguageConfig],
- speculative_config: Optional[SpeculativeConfig],
- decoding_config: Optional[DecodingConfig],
- executor_class: Type[ExecutorBase],
- log_stats: bool,
- ) -> None:
- logger.info(
- f"Initializing the Aphrodite Engine (v{aphrodite.__version__}) "
- "with the following config:\n"
- f"Model = {model_config.model!r}\n"
- f"Speculative Config = {speculative_config!r}\n"
- f"DataType = {model_config.dtype}\n"
- f"Model Load Format = {model_config.load_format}\n"
- f"Number of GPUs = {parallel_config.tensor_parallel_size}\n"
- f"Disable Custom All-Reduce = "
- f"{parallel_config.disable_custom_all_reduce}\n"
- f"Quantization Format = {model_config.quantization}\n"
- f"Context Length = {model_config.max_model_len}\n"
- f"Enforce Eager Mode = {model_config.enforce_eager}\n"
- f"KV Cache Data Type = {cache_config.cache_dtype}\n"
- f"KV Cache Params Path = {model_config.quantization_param_path}\n"
- f"Device = {device_config.device}\n"
- f"Guided Decoding Backend = {decoding_config!r}\n")
- # TODO: Print more configs in debug mode.
- self.model_config = model_config
- self.cache_config = cache_config
- self.lora_config = lora_config
- self.vision_language_config = vision_language_config
- self.parallel_config = parallel_config
- self.scheduler_config = scheduler_config
- self.device_config = device_config
- self.speculative_config = speculative_config
- self.decoding_config = decoding_config or DecodingConfig()
- self.log_stats = log_stats
- self._verify_args()
- self._init_tokenizer()
- self.detokenizer = Detokenizer(self.tokenizer)
- self.seq_counter = Counter()
- self.generation_config_fields = _load_generation_config_dict(
- model_config)
- self.model_executor = executor_class(
- model_config=model_config,
- cache_config=cache_config,
- parallel_config=parallel_config,
- scheduler_config=scheduler_config,
- device_config=device_config,
- lora_config=lora_config,
- vision_language_config=vision_language_config,
- speculative_config=speculative_config,
- )
- self._initialize_kv_caches()
- # Ping the tokenizer to ensure it is loaded if
- # it runs on a separate process.
- self.tokenizer.ping()
- # Create the scheduler.
- # NOTE: the cache_config here have been updated with the numbers of
- # GPU and CPU blocks, which are profiled in the distributed executor.
- self.scheduler = Scheduler(scheduler_config, cache_config, lora_config)
- # Metric Logging.
- if self.log_stats:
- self.stat_logger = StatLogger(
- local_interval=_LOCAL_LOGGING_INTERVAL_SEC,
- labels=dict(model_name=model_config.model),
- )
- self.stat_logger.info("cache_config", self.cache_config)
- # Create sequence output processor, e.g. for beam search or
- # speculative decoding.
- self.output_processor = (
- SequenceGroupOutputProcessor.create_output_processor(
- self.scheduler_config,
- self.detokenizer,
- self.scheduler,
- self.seq_counter,
- self.get_tokenizer_for_seq,
- stop_checker=StopChecker(
- self.scheduler_config.max_model_len,
- self.get_tokenizer_for_seq,
- ),
- ))
- def _initialize_kv_caches(self) -> None:
- """Initialize the KV cache in the worker(s).
- The workers will determine the number of blocks in both the GPU cache
- and the swap CPU cache.
- """
- num_gpu_blocks, num_cpu_blocks = (
- self.model_executor.determine_num_available_blocks())
- if self.cache_config.num_gpu_blocks_override is not None:
- num_gpu_blocks_override = self.cache_config.num_gpu_blocks_override
- logger.info(f"Overriding {num_gpu_blocks=} with "
- f"{num_gpu_blocks_override=}")
- num_gpu_blocks = num_gpu_blocks_override
- self.cache_config.num_gpu_blocks = num_gpu_blocks
- self.cache_config.num_cpu_blocks = num_cpu_blocks
- self.model_executor.initialize_cache(num_gpu_blocks, num_cpu_blocks)
- @classmethod
- def from_engine_args(cls, engine_args: EngineArgs) -> "AphroditeEngine":
- """Creates an LLM engine from the engine arguments."""
- # Create the engine configs.
- engine_config = engine_args.create_engine_config()
- # Initialize the cluster and specify the executor class.
- if engine_config.device_config.device_type == "neuron":
- from aphrodite.executor.neuron_executor import NeuronExecutor
- executor_class = NeuronExecutor
- elif engine_config.device_config.device_type == "cpu":
- from aphrodite.executor.cpu_executor import CPUExecutor
- executor_class = CPUExecutor
- elif engine_config.parallel_config.worker_use_ray:
- initialize_ray_cluster(engine_config.parallel_config)
- from aphrodite.executor.ray_gpu_executor import RayGPUExecutor
- executor_class = RayGPUExecutor
- else:
- assert engine_config.parallel_config.world_size == 1, (
- "Ray is required if parallel_config.world_size > 1.")
- from aphrodite.executor.gpu_executor import GPUExecutor
- executor_class = GPUExecutor
- # Create the LLM engine.
- engine = cls(**engine_config.to_dict(),
- executor_class=executor_class,
- log_stats=not engine_args.disable_log_stats)
- return engine
- def __reduce__(self):
- # This is to ensure that the AphroditeEngine is not referenced in
- # the closure used to initialize Ray worker actors
- raise RuntimeError("AphroditeEngine should not be pickled!")
- def get_tokenizer(self) -> "PreTrainedTokenizer":
- return self.tokenizer.get_lora_tokenizer(None)
- def get_tokenizer_for_seq(self,
- sequence: Sequence) -> "PreTrainedTokenizer":
- return self.tokenizer.get_lora_tokenizer(sequence.lora_request)
- def _init_tokenizer(self, **tokenizer_init_kwargs):
- init_kwargs = dict(
- tokenizer_id=self.model_config.tokenizer,
- enable_lora=bool(self.lora_config),
- max_num_seqs=self.scheduler_config.max_num_seqs,
- max_input_length=None,
- tokenizer_mode=self.model_config.tokenizer_mode,
- trust_remote_code=self.model_config.trust_remote_code,
- revision=self.model_config.revision)
- init_kwargs.update(tokenizer_init_kwargs)
- self.tokenizer: BaseTokenizerGroup = get_tokenizer_group(
- self.parallel_config.tokenizer_pool_config, **init_kwargs)
- if len(self.get_tokenizer()) != self.model_config.get_vocab_size():
- logger.warning(
- f"The tokenizer's vocabulary size {len(self.get_tokenizer())}"
- f" does not match the model's vocabulary size "
- f"{self.model_config.get_vocab_size()}.")
- self.model_config.hf_config.tokenizer_vocab_size = len(
- self.get_tokenizer())
- def _verify_args(self) -> None:
- self.model_config.verify_with_parallel_config(self.parallel_config)
- self.cache_config.verify_with_parallel_config(self.parallel_config)
- if self.lora_config:
- self.lora_config.verify_with_model_config(self.model_config)
- self.lora_config.verify_with_scheduler_config(
- self.scheduler_config)
- def encode_request(
- self,
- request_id: str,
- prompt: Optional[str],
- prompt_token_ids: Optional[List[int]] = None,
- lora_request: Optional[LoRARequest] = None,
- ):
- if prompt_token_ids is None:
- assert prompt is not None
- prompt_token_ids = self.tokenizer.encode(request_id=request_id,
- prompt=prompt,
- lora_request=lora_request)
- return prompt_token_ids
- def add_request(
- self,
- request_id: str,
- prompt: Optional[str],
- sampling_params: SamplingParams,
- prompt_token_ids: Optional[List[int]] = None,
- arrival_time: Optional[float] = None,
- lora_request: Optional[LoRARequest] = None,
- multi_modal_data: Optional[MultiModalData] = None,
- ) -> None:
- """Add a request to the engine's request pool.
- The request is added to the request pool and will be processed by the
- scheduler as `engine.step()` is called. The exact scheduling policy is
- determined by the scheduler.
- Args:
- request_id: The unique ID of the request.
- prompt: The prompt string. Can be None if prompt_token_ids is
- provided.
- sampling_params: The sampling parameters for text generation.
- prompt_token_ids: The token IDs of the prompt. If None, we
- use the tokenizer to convert the prompts to token IDs.
- arrival_time: The arrival time of the request. If None, we use
- the current monotonic time.
- multi_modal_data: The multimodal data for the request.
- Details:
- - Set arrival_time to the current time if it is None.
- - Set prompt_token_ids to the encoded prompt if it is None.
- - Create `best_of` number of :class:`~aphrodite.Sequence` objects.
- - Create a :class:`~aphrodite.SequenceGroup` object
- from the list of :class:`~aphrodite.Sequence`.
- - Add the :class:`~aphrodite.SequenceGroup` object to the scheduler.
- Example:
- >>> # initialize engine
- >>> engine = AphroditeEngine.from_engine_args(engine_args)
- >>> # set request arguments
- >>> example_prompt = "Who is the president of the United States?"
- >>> sampling_params = SamplingParams(temperature=0.0)
- >>> request_id = 0
- >>>
- >>> # add the request to the engine
- >>> engine.add_request(
- >>> str(request_id),
- >>> example_prompt,
- >>> SamplingParams(temperature=0.0))
- >>> # continue the request processing
- >>> ...
- """
- if lora_request is not None and not self.lora_config:
- raise ValueError(f"Got lora_request {lora_request} but LoRA is "
- "not enabled!")
- max_log_probs = self.get_model_config().max_log_probs
- if (sampling_params.logprobs
- and sampling_params.logprobs > max_log_probs) or (
- sampling_params.prompt_logprobs
- and sampling_params.prompt_logprobs > max_log_probs):
- raise ValueError(f"Cannot request more than "
- f"{max_log_probs} logprobs. "
- "Please increase the max_log_probs.")
- if arrival_time is None:
- arrival_time = time.monotonic()
- prompt_token_ids = self.encode_request(
- request_id=request_id,
- prompt=prompt,
- prompt_token_ids=prompt_token_ids,
- lora_request=lora_request,
- )
- # Create the sequences.
- block_size = self.cache_config.block_size
- seq_id = next(self.seq_counter)
- eos_token_id = self.tokenizer.get_lora_tokenizer(
- lora_request).eos_token_id
- seq = Sequence(
- seq_id,
- prompt,
- prompt_token_ids,
- block_size,
- eos_token_id,
- lora_request,
- )
- # Defensive copy of SamplingParams, which are used by the sampler,
- # this doesn't deep-copy LogitsProcessor objects
- sampling_params = sampling_params.clone()
- # Inject the eos token id into the sampling_params to support min_tokens
- # processing
- sampling_params.eos_token_id = seq.eos_token_id
- sampling_params.update_from_generation_config(
- self.generation_config_fields)
- # Create the sequence group.
- seq_group = SequenceGroup(request_id, [seq], sampling_params,
- arrival_time, lora_request, multi_modal_data)
- # Add the sequence group to the scheduler.
- self.scheduler.add_seq_group(seq_group)
- def abort_request(self, request_id: Union[str, Iterable[str]]) -> None:
- """Aborts a request(s) with the given ID.
- Args:
- request_id: The ID(s) of the request to abort.
- Details:
- - Refer to the
- :meth:`~aphrodite.processing.scheduler.Scheduler.abort_seq_group`
- from class :class:`~aphrodite.processing.scheduler.Scheduler`.
- Example:
- >>> # initialize engine and add a request with request_id
- >>> request_id = str(0)
- >>> # abort the request
- >>> engine.abort_request(request_id)
- """
- self.scheduler.abort_seq_group(request_id)
- def get_model_config(self) -> ModelConfig:
- """Gets the model configuration."""
- return self.model_config
- def get_num_unfinished_requests(self) -> int:
- """Gets the number of unfinished requests."""
- return self.scheduler.get_num_unfinished_seq_groups()
- def has_unfinished_requests(self) -> bool:
- """Returns True if there are unfinished requests."""
- return self.scheduler.has_unfinished_seqs()
- def _process_model_outputs(
- self, output: List[SamplerOutput],
- scheduled_seq_groups: List[SequenceGroup],
- ignored_seq_groups: List[SequenceGroup]) -> List[RequestOutput]:
- """Apply the model output to the sequences in the scheduled seq groups.
-
- Returns RequestOutputs that can be returned to the client.
- """
- now = time.time()
- # Organize outputs by [sequence group][step] instead of
- # [step][sequence group].
- output_by_sequence_group = create_output_by_sequence_group(
- sampler_outputs=output, num_seq_groups=len(scheduled_seq_groups))
- # Update the scheduled sequence groups with the model outputs.
- for scheduled_seq_group, outputs in zip(scheduled_seq_groups,
- output_by_sequence_group):
- seq_group = scheduled_seq_group.seq_group
- seq_group.update_num_computed_tokens(
- scheduled_seq_group.token_chunk_size)
- # If all sequences in the sequence group are in DECODE, then we can
- # process the output tokens. Otherwise, they are (chunked) prefill
- # samples and should not be processed.
- stages = [seq.data._stage for seq in seq_group.seqs_dict.values()]
- if all(stage == SequenceStage.DECODE for stage in stages):
- self.output_processor.process_outputs(seq_group, outputs)
- # Free the finished sequence groups.
- self.scheduler.free_finished_seq_groups()
- # Create the outputs.
- request_outputs: List[RequestOutput] = []
- for scheduled_seq_group in scheduled_seq_groups:
- seq_group = scheduled_seq_group.seq_group
- seq_group.maybe_set_first_token_time(now)
- request_output = RequestOutput.from_seq_group(seq_group)
- request_outputs.append(request_output)
- for seq_group in ignored_seq_groups:
- request_output = RequestOutput.from_seq_group(seq_group)
- request_outputs.append(request_output)
- return request_outputs
- def step(self) -> List[RequestOutput]:
- """Performs one decoding iteration and returns newly generated results.
- .. figure:: https://i.imgur.com/sv2HssD.png
- :alt: Overview of the step function
- :align: center
- Overview of the step function.
- Details:
- - Step 1: Schedules the sequences to be executed in the next
- iteration and the token blocks to be swapped in/out/copy.
- - Depending on the scheduling policy,
- sequences may be `preempted/reordered`.
- - A Sequence Group (SG) refer to a group of sequences
- that are generated from the same prompt.
- - Step 2: Calls the distributed executor to execute the model.
- - Step 3: Processes the model output. This mainly includes:
- - Decodes the relevant outputs.
- - Updates the scheduled sequence groups with model outputs
- based on its `sampling parameters` (`use_beam_search` or not).
- - Frees the finished sequence groups.
- - Finally, it creates and returns the newly generated results.
- Example:
- >>> # Please see the example/ folder for more detailed examples.
- >>>
- >>> # initialize engine and request arguments
- >>> engine = AphroditeEngine.from_engine_args(engine_args)
- >>> example_inputs = [(0, "What is LLM?",
- >>> SamplingParams(temperature=0.0))]
- >>>
- >>> # Start the engine with an event loop
- >>> while True:
- >>> if example_inputs:
- >>> req_id, prompt, sampling_params = example_inputs.pop(0)
- >>> engine.add_request(str(req_id), prompt, sampling_params)
- >>>
- >>> # continue the request processing
- >>> request_outputs = engine.step()
- >>> for request_output in request_outputs:
- >>> if request_output.finished:
- >>> # return or show the request output
- >>>
- >>> if not (engine.has_unfinished_requests() or example_inputs):
- >>> break
- """
- seq_group_metadata_list, scheduler_outputs = self.scheduler.schedule()
- if not scheduler_outputs.is_empty():
- output = self.model_executor.execute_model(
- seq_group_metadata_list=seq_group_metadata_list,
- blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
- blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
- blocks_to_copy=scheduler_outputs.blocks_to_copy,
- num_lookahead_slots=scheduler_outputs.num_lookahead_slots)
- else:
- output = []
- request_outputs = self._process_model_outputs(
- output, scheduler_outputs.scheduled_seq_groups,
- scheduler_outputs.ignored_seq_groups)
- # Log stats.
- if self.log_stats:
- self.stat_logger.log(
- self._get_stats(scheduler_outputs, model_output=output))
- return request_outputs
- def do_log_stats(self) -> None:
- """Forced log when no requests active."""
- if self.log_stats:
- self.stat_logger.log(self._get_stats(scheduler_outputs=None))
- def _get_stats(
- self,
- scheduler_outputs: Optional[SchedulerOutputs],
- model_output: Optional[List[SamplerOutput]] = None) -> Stats:
- """Get Stats to be Logged to Prometheus.
- Args:
- scheduler_outputs: Optional, used to populate metrics related to
- the scheduled batch,
- model_output: Optional, used to emit speculative decoding metrics
- which are created by the workers.
- """
- now = time.monotonic()
- # KV Cache Usage in %.
- num_total_gpu = self.cache_config.num_gpu_blocks
- num_free_gpu = self.scheduler.block_manager.get_num_free_gpu_blocks()
- gpu_cache_usage = 1.0 - (num_free_gpu / num_total_gpu)
- num_total_cpu = self.cache_config.num_cpu_blocks
- cpu_cache_usage = 0.0
- if num_total_cpu > 0:
- num_free_cpu = (
- self.scheduler.block_manager.get_num_free_cpu_blocks())
- cpu_cache_usage = 1.0 - (num_free_cpu / num_total_cpu)
- # Scheduler State
- num_running = len(self.scheduler.running)
- num_swapped = len(self.scheduler.swapped)
- num_waiting = len(self.scheduler.waiting)
- # Iteration stats if we have scheduler output.
- num_prompt_tokens = 0
- num_generation_tokens = 0
- time_to_first_tokens = []
- time_per_output_tokens = []
- time_e2e_requests = []
- if scheduler_outputs is not None:
- prompt_run = scheduler_outputs.num_prefill_groups > 0
- # Number of Tokens.
- if prompt_run:
- num_prompt_tokens = sum(
- len(scheduled_seq_group.seq_group.prompt_token_ids)
- for scheduled_seq_group in
- scheduler_outputs.scheduled_seq_groups)
- num_generation_tokens = sum(
- scheduled_seq_group.seq_group.num_seqs()
- for scheduled_seq_group in
- scheduler_outputs.scheduled_seq_groups)
- else:
- num_generation_tokens = scheduler_outputs.num_batched_tokens
- # Latency Timings.
- time_last_iters = []
- for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups:
- seq_group = scheduled_seq_group.seq_group
- # Time since last token.
- # (n.b. updates seq_group.metrics.last_token_time)
- time_last_iters.append(seq_group.get_last_latency(now))
- # Time since arrival for all finished requests.
- if seq_group.is_finished():
- time_e2e_requests.append(now -
- seq_group.metrics.arrival_time)
- time_to_first_tokens = time_last_iters if prompt_run else []
- time_per_output_tokens = [] if prompt_run else time_last_iters
- # Spec decode, if enabled, emits specialized metrics from the worker in
- # sampler output.
- if model_output and (model_output[0].spec_decode_worker_metrics
- is not None):
- spec_decode_metrics = model_output[0].spec_decode_worker_metrics
- else:
- spec_decode_metrics = None
- return Stats(
- now=now,
- num_running=num_running,
- num_swapped=num_swapped,
- num_waiting=num_waiting,
- gpu_cache_usage=gpu_cache_usage,
- cpu_cache_usage=cpu_cache_usage,
- num_prompt_tokens=num_prompt_tokens,
- num_generation_tokens=num_generation_tokens,
- time_to_first_tokens=time_to_first_tokens,
- time_per_output_tokens=time_per_output_tokens,
- time_e2e_requests=time_e2e_requests,
- spec_decode_metrics=spec_decode_metrics,
- )
- def add_lora(self, lora_request: LoRARequest) -> bool:
- return self.model_executor.add_lora(lora_request)
- def remove_lora(self, lora_id: int) -> bool:
- return self.model_executor.remove_lora(lora_id)
- def list_loras(self) -> List[int]:
- return self.model_executor.list_loras()
- def check_health(self) -> None:
- self.model_executor.check_health()
- setup_logger()
|