from typing import List, Optional, Tuple, Type import pytest import torch import torch.types from transformers import BatchEncoding from aphrodite.common.sequence import SampleLogprobs from aphrodite.multimodal.utils import rescale_image_size from ..conftest import IMAGE_ASSETS, AphroditeRunner, HfRunner, _ImageAssets from .utils import check_logprobs_close pytestmark = pytest.mark.vlm # The image token is placed before "user" on purpose so that the test can pass HF_IMAGE_PROMPTS = IMAGE_ASSETS.prompts({ "stop_sign": "<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n" \ "(./)\nWhat's the content of the image?<|eot_id|>" \ "<|start_header_id|>assistant<|end_header_id|>\n\n", # noqa: E501 "cherry_blossom": "<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n" \ "(./)\nWhat is the season?<|eot_id|>" \ "<|start_header_id|>assistant<|end_header_id|>\n\n", }) models = ["openbmb/MiniCPM-Llama3-V-2_5"] def _wrap_inputs(hf_inputs: BatchEncoding) -> BatchEncoding: return BatchEncoding({"model_inputs": hf_inputs}) def trunc_hf_output(hf_output: Tuple[List[int], str, Optional[SampleLogprobs]]): output_ids, output_str, out_logprobs = hf_output if output_str.endswith("<|eot_id|>"): output_str = output_str.split("<|eot_id|>")[0] return output_ids, output_str, out_logprobs target_dtype = "half" def run_test( hf_runner: Type[HfRunner], aphrodite_runner: Type[AphroditeRunner], image_assets: _ImageAssets, model: str, *, size_factors: List[float], dtype: str, max_tokens: int, num_logprobs: int, tensor_parallel_size: int, distributed_executor_backend: Optional[str] = None, ): """Inference result should be the same between hf and aphrodite. All the image fixtures for the test is under tests/images. For huggingface runner, we provide the PIL images as input. For aphrodite runner, we provide MultiModalDataDict objects and corresponding MultiModalConfig as input. Note, the text input is also adjusted to abide by aphrodite contract. The text output is sanitized to be able to compare with hf. """ images = [asset.pil_image for asset in image_assets] inputs_per_image = [( [prompt for _ in size_factors], [rescale_image_size(image, factor) for factor in size_factors], ) for image, prompt in zip(images, HF_IMAGE_PROMPTS)] # NOTE: take care of the order. run Aphrodite first, and then run HF. # Aphrodite needs a fresh new process without cuda initialization. # if we run HF first, the cuda initialization will be done and it # will hurt multiprocessing backend with fork method (the default method). # max_model_len should be greater than image_feature_size with aphrodite_runner(model, max_model_len=4096, max_num_seqs=1, dtype=dtype, tensor_parallel_size=tensor_parallel_size, distributed_executor_backend=distributed_executor_backend, enforce_eager=True) as aphrodite_model: tokenizer = aphrodite_model.model.get_tokenizer() stop_token_ids = [tokenizer.eos_id, tokenizer.eot_id] aphrodite_outputs_per_image = [ aphrodite_model.generate_greedy_logprobs(prompts, max_tokens, num_logprobs=num_logprobs, images=images, stop_token_ids=stop_token_ids) for prompts, images in inputs_per_image ] hf_model = hf_runner(model, dtype=dtype, postprocess_inputs=_wrap_inputs) with hf_model, torch.no_grad(): hf_outputs_per_image = [ hf_model.generate_greedy_logprobs_limit(prompts, max_tokens, num_logprobs=num_logprobs, images=images, tokenizer=tokenizer) for prompts, images in inputs_per_image ] for hf_outputs, aphrodite_outputs in zip(hf_outputs_per_image, aphrodite_outputs_per_image): check_logprobs_close( outputs_0_lst=[ trunc_hf_output(hf_output) for hf_output in hf_outputs ], outputs_1_lst=aphrodite_outputs, name_0="hf", name_1="aphrodite", ) @pytest.mark.parametrize("model", models) @pytest.mark.parametrize( "size_factors", [ # No image [], # Single-scale [1.0], # Single-scale, batched [1.0, 1.0, 1.0], # Multi-scale [0.25, 0.5, 1.0], ], ) @pytest.mark.parametrize("dtype", [target_dtype]) @pytest.mark.parametrize("max_tokens", [128]) @pytest.mark.parametrize("num_logprobs", [5]) def test_models(hf_runner, aphrodite_runner, image_assets, model, size_factors, dtype: str, max_tokens: int, num_logprobs: int) -> None: run_test( hf_runner, aphrodite_runner, image_assets, model, size_factors=size_factors, dtype=dtype, max_tokens=max_tokens, num_logprobs=num_logprobs, tensor_parallel_size=1, ) HF_MULTIIMAGE_IMAGE_PROMPT = \ "<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n" \ "(./)\n(./)\n" \ "Describe these images.<|eot_id|>" \ "<|start_header_id|>assistant<|end_header_id|>\n\n" def run_multi_image_test( hf_runner: Type[HfRunner], aphrodite_runner: Type[AphroditeRunner], image_assets: _ImageAssets, model: str, *, size_factors: List[float], dtype: str, max_tokens: int, num_logprobs: int, tensor_parallel_size: int, distributed_executor_backend: Optional[str] = None, ): """Inference result should be the same between hf and aphrodite. All the image fixtures for the test is under tests/images. For huggingface runner, we provide the PIL images as input. For aphrodite runner, we provide MultiModalDataDict objects and corresponding MultiModalConfig as input. Note, the text input is also adjusted to abide by aphrodite contract. The text output is sanitized to be able to compare with hf. """ images = [asset.pil_image for asset in image_assets] inputs_per_case = [ ([HF_MULTIIMAGE_IMAGE_PROMPT for _ in size_factors], [[rescale_image_size(image, factor) for image in images] for factor in size_factors]) ] # NOTE: take care of the order. run Aphrodite first, and then run HF. # Aphrodite needs a fresh new process without cuda initialization. # if we run HF first, the cuda initialization will be done and it # will hurt multiprocessing backend with fork method (the default method). # max_model_len should be greater than image_feature_size with aphrodite_runner(model, max_model_len=4096, max_num_seqs=1, limit_mm_per_prompt={"image": len(images)}, dtype=dtype, tensor_parallel_size=tensor_parallel_size, distributed_executor_backend=distributed_executor_backend, enforce_eager=True) as aphrodite_model: tokenizer = aphrodite_model.model.get_tokenizer() stop_token_ids = [tokenizer.eos_id, tokenizer.eot_id] aphrodite_outputs_per_case = [ aphrodite_model.generate_greedy_logprobs(prompts, max_tokens, num_logprobs=num_logprobs, images=images, stop_token_ids=stop_token_ids) for prompts, images in inputs_per_case ] hf_model = hf_runner(model, dtype=dtype, postprocess_inputs=_wrap_inputs) with hf_model, torch.no_grad(): hf_outputs_per_case = [ hf_model.generate_greedy_logprobs_limit(prompts, max_tokens, num_logprobs=num_logprobs, images=images, tokenizer=tokenizer) for prompts, images in inputs_per_case ] for hf_outputs, aphrodite_outputs in zip(hf_outputs_per_case, aphrodite_outputs_per_case): check_logprobs_close( outputs_0_lst=[ trunc_hf_output(hf_output) for hf_output in hf_outputs ], outputs_1_lst=aphrodite_outputs, name_0="hf", name_1="aphrodite", ) @pytest.mark.parametrize("model", models) @pytest.mark.parametrize( "size_factors", [ # No image [], # Single-scale [1.0], # Single-scale, batched [1.0, 1.0, 1.0], # Multi-scale [0.25, 0.5, 1.0], ], ) @pytest.mark.parametrize("dtype", [target_dtype]) @pytest.mark.parametrize("max_tokens", [128]) @pytest.mark.parametrize("num_logprobs", [5]) def test_multi_images_models(hf_runner, aphrodite_runner, image_assets, model, size_factors, dtype: str, max_tokens: int, num_logprobs: int) -> None: run_multi_image_test( hf_runner, aphrodite_runner, image_assets, model, size_factors=size_factors, dtype=dtype, max_tokens=max_tokens, num_logprobs=num_logprobs, tensor_parallel_size=1, )