123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- # ruff: noqa
- #
- # GGUF file reading/modification support. For API usage information,
- # please see the files scripts/ for some fairly simple examples.
- #
- from __future__ import annotations
- import os
- from collections import OrderedDict
- from typing import Any, Literal, NamedTuple, TypeVar, Union
- import numpy as np
- import numpy.typing as npt
- if __name__ == "__main__":
- import sys
- from pathlib import Path
- # Allow running file in package as a script.
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from .constants import (
- GGML_QUANT_SIZES,
- GGUF_DEFAULT_ALIGNMENT,
- GGUF_MAGIC,
- GGUF_VERSION,
- GGMLQuantizationType,
- GGUFValueType,
- )
- READER_SUPPORTED_VERSIONS = [2, GGUF_VERSION]
- class ReaderField(NamedTuple):
- # Offset to start of this field.
- offset: int
- # Name of the field (not necessarily from file data).
- name: str
- # Data parts. Some types have multiple components, such as strings
- # that consist of a length followed by the string data.
- parts: list[npt.NDArray[Any]] = []
- # Indexes into parts that we can call the actual data. For example
- # an array of strings will be populated with indexes to the actual
- # string data.
- data: list[int] = [-1]
- types: list[GGUFValueType] = []
- class ReaderTensor(NamedTuple):
- name: str
- tensor_type: GGMLQuantizationType
- shape: npt.NDArray[np.uint32]
- n_elements: int
- n_bytes: int
- data_offset: int
- data: npt.NDArray[Any]
- field: ReaderField
- class GGUFReader:
- # I - same as host, S - swapped
- byte_order: Literal['I' | 'S'] = 'I'
- alignment: int = GGUF_DEFAULT_ALIGNMENT
- # Note: Internal helper, API may change.
- gguf_scalar_to_np: dict[GGUFValueType, type[np.generic]] = {
- GGUFValueType.UINT8: np.uint8,
- GGUFValueType.INT8: np.int8,
- GGUFValueType.UINT16: np.uint16,
- GGUFValueType.INT16: np.int16,
- GGUFValueType.UINT32: np.uint32,
- GGUFValueType.INT32: np.int32,
- GGUFValueType.FLOAT32: np.float32,
- GGUFValueType.UINT64: np.uint64,
- GGUFValueType.INT64: np.int64,
- GGUFValueType.FLOAT64: np.float64,
- GGUFValueType.BOOL: np.bool_,
- }
- def __init__(self,
- path: os.PathLike[str] | str,
- mode: Literal['r' | 'r+' | 'c'] = 'r'):
- self.data = np.memmap(path, mode=mode)
- offs = 0
- if self._get(offs, np.uint32, override_order='<')[0] != GGUF_MAGIC:
- raise ValueError('GGUF magic invalid')
- offs += 4
- temp_version = self._get(offs, np.uint32)
- if temp_version[0] & 65535 == 0:
- # If we get 0 here that means it's (probably) a GGUF file created for
- # the opposite byte order of the machine this script is running on.
- self.byte_order = 'S'
- temp_version = temp_version.newbyteorder(self.byte_order)
- version = temp_version[0]
- if version not in READER_SUPPORTED_VERSIONS:
- raise ValueError(
- f'Sorry, file appears to be version {version} which we cannot handle'
- )
- self.fields: OrderedDict[str, ReaderField] = OrderedDict()
- self.tensors: list[ReaderTensor] = []
- offs += self._push_field(
- ReaderField(offs, 'GGUF.version', [temp_version], [0],
- [GGUFValueType.UINT32]))
- temp_counts = self._get(offs, np.uint64, 2)
- offs += self._push_field(
- ReaderField(offs, 'GGUF.tensor_count', [temp_counts[:1]], [0],
- [GGUFValueType.UINT64]))
- offs += self._push_field(
- ReaderField(offs, 'GGUF.kv_count', [temp_counts[1:]], [0],
- [GGUFValueType.UINT64]))
- tensor_count, kv_count = temp_counts
- offs = self._build_fields(offs, kv_count)
- offs, tensors_fields = self._build_tensors_fields(offs, tensor_count)
- new_align = self.fields.get('general.alignment')
- if new_align is not None:
- if new_align.types != [GGUFValueType.UINT32]:
- raise ValueError('Bad type for general.alignment field')
- self.alignment = new_align.parts[-1][0]
- padding = offs % self.alignment
- if padding != 0:
- offs += self.alignment - padding
- self._build_tensors(offs, tensors_fields)
- _DT = TypeVar('_DT', bound=npt.DTypeLike)
- # Fetch a key/value metadata field by key.
- def get_field(self, key: str) -> Union[ReaderField, None]:
- return self.fields.get(key, None)
- # Fetch a tensor from the list by index.
- def get_tensor(self, idx: int) -> ReaderTensor:
- return self.tensors[idx]
- def _get(
- self,
- offset: int,
- dtype: npt.DTypeLike,
- count: int = 1,
- override_order: None | Literal['I' | 'S' | '<'] = None,
- ) -> npt.NDArray[Any]:
- count = int(count)
- itemsize = int(np.empty([], dtype=dtype).itemsize)
- end_offs = offset + itemsize * count
- return (self.data[offset:end_offs].view(
- dtype=dtype)[:count].newbyteorder(override_order
- or self.byte_order))
- def _push_field(self, field: ReaderField, skip_sum: bool = False) -> int:
- if field.name in self.fields:
- raise KeyError(
- f'Duplicate {field.name} already in list at offset {field.offset}'
- )
- self.fields[field.name] = field
- return 0 if skip_sum else sum(int(part.nbytes) for part in field.parts)
- def _get_str(
- self, offset: int
- ) -> tuple[npt.NDArray[np.uint64], npt.NDArray[np.uint8]]:
- slen = self._get(offset, np.uint64)
- return slen, self._get(offset + 8, np.uint8, slen[0])
- def _get_field_parts(
- self,
- orig_offs: int,
- raw_type: int,
- ) -> tuple[int, list[npt.NDArray[Any]], list[int], list[GGUFValueType]]:
- offs = orig_offs
- types: list[GGUFValueType] = []
- gtype = GGUFValueType(raw_type)
- types.append(gtype)
- # Handle strings.
- if gtype == GGUFValueType.STRING:
- sparts: list[npt.NDArray[Any]] = list(self._get_str(offs))
- size = sum(int(part.nbytes) for part in sparts)
- return size, sparts, [1], types
- # Check if it's a simple scalar type.
- nptype = self.gguf_scalar_to_np.get(gtype)
- if nptype is not None:
- val = self._get(offs, nptype)
- return int(val.nbytes), [val], [0], types
- # Handle arrays.
- if gtype == GGUFValueType.ARRAY:
- raw_itype = self._get(offs, np.uint32)
- offs += int(raw_itype.nbytes)
- alen = self._get(offs, np.uint64)
- offs += int(alen.nbytes)
- aparts: list[npt.NDArray[Any]] = [raw_itype, alen]
- data_idxs: list[int] = []
- for idx in range(alen[0]):
- curr_size, curr_parts, curr_idxs, curr_types = self._get_field_parts(
- offs, raw_itype[0])
- if idx == 0:
- types += curr_types
- idxs_offs = len(aparts)
- aparts += curr_parts
- data_idxs += (idx + idxs_offs for idx in curr_idxs)
- offs += curr_size
- return offs - orig_offs, aparts, data_idxs, types
- # We can't deal with this one.
- raise ValueError('Unknown/unhandled field type {gtype}')
- def _get_tensor(self, orig_offs: int) -> ReaderField:
- offs = orig_offs
- name_len, name_data = self._get_str(offs)
- offs += int(name_len.nbytes + name_data.nbytes)
- n_dims = self._get(offs, np.uint32)
- offs += int(n_dims.nbytes)
- dims = self._get(offs, np.uint64, n_dims[0])
- offs += int(dims.nbytes)
- raw_dtype = self._get(offs, np.uint32)
- offs += int(raw_dtype.nbytes)
- offset_tensor = self._get(offs, np.uint64)
- offs += int(offset_tensor.nbytes)
- return ReaderField(
- orig_offs,
- str(bytes(name_data), encoding='utf-8'),
- [name_len, name_data, n_dims, dims, raw_dtype, offset_tensor],
- [1, 3, 4, 5],
- )
- def _build_fields(self, offs: int, count: int) -> int:
- for _ in range(count):
- orig_offs = offs
- kv_klen, kv_kdata = self._get_str(offs)
- offs += int(kv_klen.nbytes + kv_kdata.nbytes)
- raw_kv_type = self._get(offs, np.uint32)
- offs += int(raw_kv_type.nbytes)
- parts: list[npt.NDArray[Any]] = [kv_klen, kv_kdata, raw_kv_type]
- idxs_offs = len(parts)
- field_size, field_parts, field_idxs, field_types = self._get_field_parts(
- offs, raw_kv_type[0])
- parts += field_parts
- self._push_field(ReaderField(
- orig_offs,
- str(bytes(kv_kdata), encoding='utf-8'),
- parts,
- [idx + idxs_offs for idx in field_idxs],
- field_types,
- ),
- skip_sum=True)
- offs += field_size
- return offs
- def _build_tensors_fields(self, offs: int,
- count: int) -> tuple[int, list[ReaderField]]:
- tensor_fields = []
- for _ in range(count):
- field = self._get_tensor(offs)
- offs += sum(int(part.nbytes) for part in field.parts)
- tensor_fields.append(field)
- return offs, tensor_fields
- def _build_tensors(self, start_offs: int,
- fields: list[ReaderField]) -> None:
- tensors = []
- for field in fields:
- _name_len, name_data, _n_dims, dims, raw_dtype, offset_tensor = field.parts
- ggml_type = GGMLQuantizationType(raw_dtype[0])
- n_elems = np.prod(dims)
- block_size, type_size = GGML_QUANT_SIZES[ggml_type]
- n_bytes = n_elems * type_size // block_size
- data_offs = int(start_offs + offset_tensor[0])
- item_type: npt.DTypeLike
- if ggml_type == GGMLQuantizationType.F16:
- item_count = n_elems
- item_type = np.float16
- elif ggml_type == GGMLQuantizationType.F32:
- item_count = n_elems
- item_type = np.float32
- elif ggml_type == GGMLQuantizationType.F64:
- item_count = n_elems
- item_type = np.float64
- elif ggml_type == GGMLQuantizationType.I8:
- item_count = n_elems
- item_type = np.int8
- elif ggml_type == GGMLQuantizationType.I16:
- item_count = n_elems
- item_type = np.int16
- elif ggml_type == GGMLQuantizationType.I32:
- item_count = n_elems
- item_type = np.int32
- elif ggml_type == GGMLQuantizationType.I64:
- item_count = n_elems
- item_type = np.int64
- else:
- item_count = n_bytes
- item_type = np.uint8
- tensors.append(
- ReaderTensor(
- name=str(bytes(name_data), encoding='utf-8'),
- tensor_type=ggml_type,
- shape=dims,
- n_elements=n_elems,
- n_bytes=n_bytes,
- data_offset=data_offs,
- data=self._get(data_offs, item_type, item_count),
- field=field,
- ))
- self.tensors = tensors
|