123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- import multiprocessing as mp
- import os
- import shutil
- from tempfile import TemporaryDirectory
- import pytest
- import torch
- from huggingface_hub import snapshot_download
- from aphrodite import LLM, SamplingParams
- from aphrodite.modeling.model_loader.loader import ShardedStateLoader
- prompts = [
- "Hello, my name is",
- "The president of the United States is",
- "The capital of France is",
- "The future of AI is",
- ]
- # Create a sampling params object.
- sampling_params = SamplingParams(
- temperature=0,
- max_tokens=256,
- ignore_eos=True,
- )
- def test_filter_subtensors():
- state_dict = {
- "a": torch.empty(2),
- "b": torch.empty((2, 4)),
- "c": torch.empty((2, 4, 8)),
- }
- state_dict.update({
- "x": state_dict["b"],
- "y": state_dict["c"][1, 2, :],
- "z": state_dict["c"][1, :, 4],
- })
- filtered_state_dict = ShardedStateLoader._filter_subtensors(state_dict)
- assert tuple(filtered_state_dict.keys()) == ("a", "b", "c")
- for key, tensor in filtered_state_dict.items():
- # NOTE: don't use `equal` here, as the tensor might contain NaNs
- assert tensor is state_dict[key]
- @pytest.fixture(scope="module")
- def llama_2_7b_files():
- with TemporaryDirectory() as cache_dir:
- input_dir = snapshot_download("meta-llama/Llama-2-7b-hf",
- cache_dir=cache_dir,
- ignore_patterns="*.bin*")
- yield input_dir
- def _run_writer(input_dir, output_dir, weights_patterns, **kwargs):
- llm_sharded_writer = LLM(model=input_dir, **kwargs)
- # Dump worker states to output directory
- llm_sharded_writer.llm_engine.model_executor.save_sharded_state(
- path=output_dir)
- # Copy metadata files to output directory
- for file in os.listdir(input_dir):
- if not any(file.endswith(ext) for ext in weights_patterns):
- shutil.copy(f"{input_dir}/{file}", output_dir)
- def _run_generate(input_dir, queue: mp.Queue, **kwargs):
- llm = LLM(model=input_dir, **kwargs)
- gen = llm.generate(prompts, sampling_params)
- queue.put([g.outputs[0].__dict__ for g in gen])
- queue.close()
- queue.join_thread()
- @pytest.mark.parametrize("enable_lora", [False, True])
- @pytest.mark.parametrize("tp_size", [1, 2])
- def test_sharded_state_loader(enable_lora, tp_size, num_gpus_available,
- llama_2_7b_files):
- if num_gpus_available < tp_size:
- pytest.skip(f"Not enough GPUs for tensor parallelism {tp_size}")
- weights_patterns = ("*.safetensors", )
- gpu_memory_utilization = 0.8
- input_dir = llama_2_7b_files
- ctx = mp.get_context("spawn")
- # Run in separate processes for memory & CUDA isolation
- with TemporaryDirectory() as output_dir:
- p = ctx.Process(target=_run_writer,
- args=(input_dir, output_dir, weights_patterns),
- kwargs=dict(
- tensor_parallel_size=tp_size,
- distributed_executor_backend="mp",
- gpu_memory_utilization=gpu_memory_utilization,
- enforce_eager=True,
- ))
- p.start()
- p.join()
- queue = ctx.Queue()
- p = ctx.Process(target=_run_generate,
- args=(input_dir, queue),
- kwargs=dict(
- distributed_executor_backend="mp",
- enable_lora=enable_lora,
- gpu_memory_utilization=gpu_memory_utilization,
- tensor_parallel_size=tp_size,
- ))
- p.start()
- p.join()
- out_before = queue.get()
- p = ctx.Process(target=_run_generate,
- args=(output_dir, queue),
- kwargs=dict(
- distributed_executor_backend="mp",
- enable_lora=enable_lora,
- gpu_memory_utilization=gpu_memory_utilization,
- tensor_parallel_size=tp_size,
- load_format="sharded_state",
- ))
- p.start()
- p.join()
- out_after = queue.get()
- assert out_before == out_after
|