123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- """Tests for cutlass kernels
- Run `pytest tests/kernels/test_cutlass.py`.
- """
- from typing import Optional, Type
- import pytest
- import torch
- from aphrodite import _custom_ops as ops
- from aphrodite.platforms import current_platform
- from tests.kernels.utils import opcheck
- CUDA_DEVICES = [
- f"cuda:{i}" for i in range(1 if torch.cuda.device_count() == 1 else 2)
- ]
- capability = current_platform.get_device_capability()
- capability = capability[0] * 10 + capability[1]
- def to_fp8(tensor: torch.Tensor):
- finfo = torch.finfo(torch.float8_e4m3fn)
- return torch.round(tensor.clamp(
- min=finfo.min, max=finfo.max)).to(dtype=torch.float8_e4m3fn)
- def to_int8(tensor: torch.Tensor):
- return torch.round(tensor.clamp(min=-128, max=127)).to(dtype=torch.int8)
- def rand_int8(shape: tuple, device: str = "cuda"):
- return to_int8(torch.rand(shape, device=device) * 255 - 128)
- def baseline_scaled_mm(a: torch.Tensor,
- b: torch.Tensor,
- scale_a: torch.Tensor,
- scale_b: torch.Tensor,
- out_dtype: Type[torch.dtype],
- bias: Optional[torch.Tensor] = None) -> torch.Tensor:
- output = (scale_a * (scale_b * (torch.mm(
- a.to(dtype=torch.float32), b.to(dtype=torch.float32))))).to(out_dtype)
- if bias is not None:
- output = output + bias
- return output
- def cutlass_fp8_gemm_helper(m: int,
- n: int,
- k: int,
- per_token_act_quant: bool,
- per_out_channel_weight_quant: bool,
- use_bias: bool,
- out_dtype: Type[torch.dtype] = torch.bfloat16,
- device: str = "cuda"):
- # Test for a cutlass kernel with per-token activation quantization
- # and per-output channel weight quantization.
- a = to_fp8(torch.randn((m, k), device=device))
- b = to_fp8(torch.randn((n, k), device=device).t())
- m_a_scales = m if per_token_act_quant else 1
- n_b_scales = n if per_out_channel_weight_quant else 1
- scale_a = (torch.randn((m_a_scales, 1), device=device,
- dtype=torch.float32))
- scale_b = (torch.randn((1, n_b_scales), device=device,
- dtype=torch.float32))
- if use_bias:
- bias = torch.rand((n, ), device=device, dtype=out_dtype) * 10
- else:
- bias = None
- out = ops.cutlass_scaled_mm(a, b, scale_a, scale_b, out_dtype, bias)
- baseline = baseline_scaled_mm(a, b, scale_a, scale_b, out_dtype, bias)
- torch.testing.assert_close(out, baseline, rtol=1e-2, atol=5e-2)
- def cutlass_int8_gemm_helper(m: int,
- n: int,
- k: int,
- per_token_act_quant: bool,
- per_out_channel_weight_quant: bool,
- use_bias: bool,
- out_dtype: Type[torch.dtype] = torch.bfloat16,
- device: str = "cuda"):
- # Test for a cutlass kernel with per-token activation quantization
- # and per-output channel weight quantization.
- a = to_int8(torch.randn((m, k), device=device) * 5)
- b = to_int8(torch.randn((n, k), device=device).t() * 5)
- m_a_scales = m if per_token_act_quant else 1
- n_b_scales = n if per_out_channel_weight_quant else 1
- scale_a = (torch.randn((m_a_scales, 1), device=device,
- dtype=torch.float32))
- scale_b = (torch.randn((1, n_b_scales), device=device,
- dtype=torch.float32))
- if use_bias:
- bias = torch.rand((n, ), device=device, dtype=out_dtype) * 10
- else:
- bias = None
- out = ops.cutlass_scaled_mm(a, b, scale_a, scale_b, out_dtype, bias)
- baseline = baseline_scaled_mm(a, b, scale_a, scale_b, out_dtype, bias)
- torch.testing.assert_close(out, baseline, rtol=1e-1, atol=1e0)
- opcheck(torch.ops._C.cutlass_scaled_mm,
- (out, a, b, scale_a, scale_b, bias))
- @pytest.mark.parametrize("m", [1, 16, 32, 64, 128, 256, 512, 222, 100, 33])
- @pytest.mark.parametrize("n", [2048, 4096, 8192, 16384, 24576, 256, 1024])
- @pytest.mark.parametrize("k", [128, 496, 1024])
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("use_bias", [True, False])
- @pytest.mark.skipif(capability < 89,
- reason="FP8 is not supported on this GPU type.")
- def test_cutlass_fp8_gemm(m: int, n: int, k: int, per_act_token: bool,
- per_out_ch: bool, use_bias: bool):
- cutlass_fp8_gemm_helper(m, n, k, per_act_token, per_out_ch, use_bias)
- @pytest.mark.parametrize("m", [1, 16, 32, 64, 128, 256, 512, 222, 33, 1])
- @pytest.mark.parametrize("n", [2048, 8192, 16384, 256, 1024])
- @pytest.mark.parametrize("k", [128, 496, 1024])
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("use_bias", [True, False])
- def test_cutlass_int8_gemm(m: int, n: int, k: int, per_act_token: bool,
- per_out_ch: bool, use_bias: bool):
- cutlass_int8_gemm_helper(m, n, k, per_act_token, per_out_ch, use_bias)
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("out_dtype", [torch.bfloat16, torch.float16])
- @pytest.mark.parametrize("use_bias", [True, False])
- def test_cutlass_int8_gemm_output_dtype(per_act_token: bool, per_out_ch: bool,
- out_dtype: Type[torch.dtype],
- use_bias: bool):
- cutlass_int8_gemm_helper(512,
- 512,
- 512,
- per_act_token,
- per_out_ch,
- use_bias,
- out_dtype=out_dtype)
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("out_dtype", [torch.bfloat16, torch.float16])
- @pytest.mark.parametrize("use_bias", [True, False])
- @pytest.mark.skipif(capability < 89,
- reason="FP8 is not supported on this GPU type.")
- def test_cutlass_fp8_gemm_output_dtype(per_act_token: bool, per_out_ch: bool,
- out_dtype: Type[torch.dtype],
- use_bias: bool):
- cutlass_fp8_gemm_helper(512,
- 512,
- 512,
- per_act_token,
- per_out_ch,
- use_bias,
- out_dtype=out_dtype)
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("use_bias", [True, False])
- @pytest.mark.parametrize("device", CUDA_DEVICES)
- @pytest.mark.skipif(capability < 89,
- reason="FP8 is not supported on this GPU type.")
- def test_cutlass_fp8_gemm_devices(per_act_token: bool, per_out_ch: bool,
- use_bias: bool, device: str):
- cutlass_fp8_gemm_helper(512, 512, 512, per_act_token, per_out_ch, use_bias,
- torch.bfloat16, device)
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("use_bias", [True, False])
- @pytest.mark.parametrize("device", CUDA_DEVICES)
- def test_cutlass_int8_gemm_devices(per_act_token: bool, per_out_ch: bool,
- use_bias: bool, device: str):
- cutlass_int8_gemm_helper(512,
- 512,
- 512,
- per_act_token,
- per_out_ch,
- use_bias,
- out_dtype=torch.bfloat16,
- device=device)
- # For the following two tests:
- # N and K correspond to the size of the weight matrix and likely to be multiples
- # of a large power of two. In any case, the kernel will have a naive fallback
- # when N and K are not divisible by 16. But M is the number of tokens and the
- # kernel must handle any M thrown at it.
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("use_bias", [True, False])
- @pytest.mark.skipif(capability < 89,
- reason="FP8 is not supported on this GPU type.")
- def test_cutlass_fp8_gemm_m_sweep(per_act_token: bool, per_out_ch: bool,
- use_bias: bool):
- for nk in range(32, 128, 32):
- for m in range(1, 128):
- cutlass_fp8_gemm_helper(m, nk, nk, per_act_token, per_out_ch,
- use_bias)
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- @pytest.mark.parametrize("use_bias", [True, False])
- def test_cutlass_int8_gemm_m_sweep(per_act_token: bool, per_out_ch: bool,
- use_bias: bool):
- for nk in range(32, 128, 32):
- for m in range(1, 128):
- cutlass_int8_gemm_helper(m, nk, nk, per_act_token, per_out_ch,
- use_bias)
- @pytest.mark.parametrize("m", [32, 64, 128])
- @pytest.mark.parametrize("n", [16, 32, 64])
- @pytest.mark.parametrize("k", [64, 128, 256])
- @pytest.mark.parametrize("out_dtype", [torch.bfloat16, torch.float16])
- @pytest.mark.skip
- def test_cutlass_int8_azp_bias_fold(m: int, n: int, k: int,
- out_dtype: torch.dtype):
- # Currently, the test is failing because folding azp into
- # 16-bit bias loses too much precision
- scale_a = torch.randn((1, 1), device="cuda", dtype=torch.float32) / 10
- scale_b = torch.randn((1, n), device="cuda", dtype=torch.float32) / 10
- aq_i8 = rand_int8((m, k))
- bq_i8 = rand_int8((n, k)).t()
- aq_i32 = aq_i8.to(dtype=torch.int32)
- bq_i32 = bq_i8.to(dtype=torch.int32)
- aq_f32 = aq_i8.to(dtype=torch.float32)
- bq_f32 = bq_i8.to(dtype=torch.float32)
- b_dq = scale_b * bq_f32
- azp_a = torch.rand((1, ), device="cuda", dtype=torch.float32) * 10 + 1.5
- azp_aq_i8 = (azp_a / scale_a).to(dtype=torch.int8)
- azp_a = azp_aq_i8.to(dtype=torch.float32) * scale_a # correct for rounding
- a_dq = scale_a * (aq_i32 + azp_aq_i8).to(dtype=torch.float32)
- torch.testing.assert_close(a_dq, scale_a * aq_f32 + azp_a)
- baseline_dq = torch.mm(a_dq, b_dq).to(out_dtype)
- J = torch.ones((1, k), device="cuda", dtype=torch.float32)
- azp_bias = (azp_a * scale_b * (J @ bq_f32)).to(out_dtype)
- assert azp_bias.shape == (1, n)
- assert azp_bias[0, :].shape == (n, )
- baseline_q = (scale_a.to(device='cpu') * scale_b.to(device='cpu') * (
- (aq_i32 + azp_aq_i8).to(device='cpu') @ bq_i32.to(device='cpu'))).to(
- dtype=out_dtype, device='cuda')
- out = ops.cutlass_scaled_mm(aq_i8,
- bq_i8,
- scale_a,
- scale_b,
- out_dtype=out_dtype,
- bias=azp_bias[0, :])
- torch.testing.assert_close(out, baseline_dq, rtol=1e-2, atol=1e0)
- torch.testing.assert_close(out, baseline_q, rtol=1e-2, atol=1e0)
- @pytest.mark.parametrize("m", [32, 64, 128])
- @pytest.mark.parametrize("n", [16, 32, 64])
- @pytest.mark.parametrize("k", [64, 128, 256])
- @pytest.mark.parametrize("out_dtype", [torch.bfloat16, torch.float16])
- @pytest.mark.parametrize("use_bias", [True, False])
- @pytest.mark.parametrize("azp_per_token", [True, False])
- def test_cutlass_int8_azp(m: int, n: int, k: int, out_dtype: torch.dtype,
- use_bias: bool, azp_per_token: bool):
- m_azp = m if azp_per_token else 1
- scale_a = torch.randn((m_azp, 1), device="cuda", dtype=torch.float32) / 10
- scale_b = torch.randn((1, n), device="cuda", dtype=torch.float32) / 10
- aq_i8 = rand_int8((m, k))
- aq_i32 = aq_i8.to(dtype=torch.int32)
- aq_f32 = aq_i8.to(dtype=torch.float32)
- bq_i8 = rand_int8((n, k)).t()
- bq_i32 = bq_i8.to(dtype=torch.int32)
- bq_f32 = bq_i8.to(dtype=torch.float32)
- b_dq = scale_b * bq_f32
- azp_a = torch.rand(
- (m_azp, 1), device="cuda", dtype=torch.float32) * 10 + 1.5
- azp_aq_i8 = (azp_a / scale_a).to(dtype=torch.int8)
- azp_a = azp_aq_i8.to(dtype=torch.float32) * scale_a # correct for rounding
- a_dq = scale_a * (aq_i32 - azp_aq_i8).to(dtype=torch.float32)
- torch.testing.assert_close(a_dq,
- scale_a * aq_f32 - azp_a,
- rtol=1e-4,
- atol=1e-3)
- if use_bias:
- bias = torch.rand((1, n), device="cuda", dtype=out_dtype) * 10 + 2.5
- else:
- bias = torch.zeros((1, n), device="cuda", dtype=out_dtype)
- baseline_dq = (torch.mm(a_dq, b_dq) + bias).to(out_dtype)
- # int32 mm not supported on CUDA
- a_noazp_i32_cpu = (aq_i32 - azp_aq_i8).to(device='cpu')
- cq = (a_noazp_i32_cpu @ bq_i32.to(device='cpu')).to(device='cuda')
- baseline_q = (scale_a * scale_b * cq + bias).to(dtype=out_dtype)
- # Hadamard is just the sum of the cols
- azp_adj_i32 = bq_i32.sum(dim=0, keepdim=True, dtype=torch.int32)
- azp_i32 = azp_aq_i8.to(dtype=torch.int32)
- func_bias = bias if use_bias else None
- if azp_per_token:
- out = ops.cutlass_scaled_mm_azp(aq_i8, bq_i8, scale_a, scale_b,
- out_dtype, azp_adj_i32, azp_i32,
- func_bias)
- else:
- azp_with_adj_i32 = azp_i32 * azp_adj_i32
- out = ops.cutlass_scaled_mm_azp(aq_i8, bq_i8, scale_a, scale_b,
- out_dtype, azp_with_adj_i32, None,
- func_bias)
- # bfloat16 precision is 7-bit mantissa -> 2^-8 ~ 0.4%
- # float16 precision is 10-bit mantissa -> 2^-11 ~ 0.05%
- rtol = 1e-2 if out_dtype == torch.bfloat16 else 1e-3
- atol = 1e-3
- torch.testing.assert_close(out, baseline_dq, rtol=rtol, atol=atol)
- torch.testing.assert_close(out, baseline_q, rtol=rtol, atol=atol)
- if azp_per_token:
- opcheck(torch.ops._C.cutlass_scaled_mm_azp,
- (out, aq_i8, bq_i8, scale_a, scale_b, azp_adj_i32, azp_i32,
- func_bias))
- else:
- opcheck(torch.ops._C.cutlass_scaled_mm_azp,
- (out, aq_i8, bq_i8, scale_a, scale_b, azp_with_adj_i32, None,
- func_bias))
- # Test working with a subset of A and B
- def test_cutlass_subset():
- big_m, big_n, big_k = 1024, 1024, 1024
- m, n, k = 512, 512, 512
- whole_a = to_int8(torch.randn((big_m, big_k), device="cuda") * 5)
- whole_b = to_int8(torch.randn((big_n, big_k), device="cuda").t() * 5)
- a = whole_a[0:m, 0:k]
- b = whole_b[0:k, 0:n]
- scale_a = torch.randn((1, 1), device="cuda", dtype=torch.float32) / 10
- scale_b = torch.randn((1, 1), device="cuda", dtype=torch.float32) / 10
- out = ops.cutlass_scaled_mm(a,
- b,
- scale_a,
- scale_b,
- out_dtype=torch.bfloat16)
- baseline = baseline_scaled_mm(a,
- b,
- scale_a,
- scale_b,
- out_dtype=torch.bfloat16)
- torch.testing.assert_close(out, baseline, rtol=1e-1, atol=1e0)
- # Test to make sure cuda graphs work
- class CutlassLayer(torch.nn.Module):
- def __init__(self, b, scale_a, scale_b, out_dtype):
- super().__init__()
- self.b = b
- self.scale_a = scale_a
- self.scale_b = scale_b
- self.out_dtype = out_dtype
- def forward(self, a):
- return ops.cutlass_scaled_mm(a, self.b, self.scale_a, self.scale_b,
- self.out_dtype)
- @pytest.mark.parametrize("per_act_token", [True, False])
- @pytest.mark.parametrize("per_out_ch", [True, False])
- def test_cutlass_cuda_graph(per_act_token: bool, per_out_ch: bool):
- m, n, k = 512, 512, 512
- a = to_int8(torch.randn((m, k), device="cuda"))
- b = to_int8(torch.randn((n, k), device="cuda").t())
- m_a_scales = m if per_act_token else 1
- n_b_scales = n if per_out_ch else 1
- scale_a = (torch.randn(
- (m_a_scales, 1), device="cuda", dtype=torch.float32) / 10)
- scale_b = (torch.randn(
- (1, n_b_scales), device="cuda", dtype=torch.float32) / 10)
- # Construct a trivial model with a single layer that calls a CUTLASS kernel
- model = CutlassLayer(b, scale_a, scale_b, torch.bfloat16)
- # Run the model with a cuda graph
- stream = torch.cuda.Stream()
- with torch.cuda.stream(stream):
- g = torch.cuda.CUDAGraph()
- with torch.cuda.graph(g):
- out = model(a)
- out.zero_()
- g.replay()
- baseline = torch.mm(scale_a * a.to(dtype=torch.float32),
- scale_b * b.to(dtype=torch.float32)).to(torch.bfloat16)
- torch.testing.assert_close(out, baseline, rtol=1e-1, atol=1e0)
|