# Transplant MTP block from one GGUF file into another

> Source: <https://gist.github.com/buzz/1c439684d5e3f36492ae9f64ef7e3f67>
> Published: 2026-05-01 23:30:18+00:00

| #!/usr/bin/env python3 | |
| """ | |
| Transplant extra tensors (e.g. MTP layers) from one GGUF file into another, | |
| producing a mixed-quantization GGUF. | |
| Note: Tested with ik_llama.cpp GGUF Python module. | |
| Usage: | |
| python convert.py <target.gguf> <source.gguf> <output.gguf> | |
| Arguments: | |
| target — base GGUF (tensors + metadata kept as-is) | |
| source — GGUF with extra blocks to transplant (e.g. blk.64.* for MTP) | |
| output — resulting mixed-quantization GGUF | |
| The script preserves the exact on-disk layout including per-row metadata | |
| for quantization types like IQ4_KS that have row_meta_size > 0. This is | |
| critical for GPU inference to work correctly. | |
| Donor Models: | |
| To save bandwidth, you can use 'tensors-only' GGUFs as the source file. | |
| Credits to AzerbaijanNyan for the extraction: | |
| https://www.reddit.com/r/LocalLLaMA/comments/1t6r1ny/extracted_mtp_tensor_ggufs_smaller_donor_models/ | |
| Available donors: | |
| - Qwen3.6-35A3B: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-35A3B-MTP-TENSORS-ONLY | |
| - Qwen3.6-27b: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-27b-MTP-TENSORS-ONLY | |
| Example: | |
| # Transplant MTP block from Q8_0 into IQ4_KS base model | |
| python convert.py Qwen3.6-27B-IQ4_KS.gguf Qwen3.6-27B-MTP-Q8_0.gguf Qwen3.6-27B-MTP-IQ4_KS.gguf | |
| """ | |
| import hashlib | |
| import sys | |
| import struct | |
| from pathlib import Path | |
| from gguf import GGUFReader, GGUFValueType | |
| def get_field_value(reader: GGUFReader, key: str): | |
| """Safely get a field value from GGUFReader.""" | |
| field = reader.get_field(key) | |
| return field.contents() if field else None | |
| def calculate_on_disk_sizes(tensors, file_size): | |
| """Calculate on-disk size for each tensor (including per-row metadata/padding).""" | |
| n_tensors = len(tensors) | |
| sizes = [] | |
| for i in range(n_tensors): | |
| if i < n_tensors - 1: | |
| sizes.append(tensors[i + 1].data_offset - tensors[i].data_offset) | |
| else: | |
| sizes.append(file_size - tensors[i].data_offset) | |
| return sizes | |
| def write_kv_value(fout, kv_type, value): | |
| """Write a KV value to the output file.""" | |
| if kv_type == GGUFValueType.STRING: | |
| value_bytes = value.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(value_bytes))) | |
| fout.write(value_bytes) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| # This is handled separately in the main code | |
| pass | |
| elif kv_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL): | |
| fout.write(struct.pack("<B", value)) | |
| elif kv_type in (GGUFValueType.UINT16, GGUFValueType.INT16): | |
| fout.write(struct.pack("<H", value)) | |
| elif kv_type in (GGUFValueType.UINT32, GGUFValueType.INT32): | |
| fout.write(struct.pack("<I", value)) | |
| elif kv_type == GGUFValueType.FLOAT32: | |
| fout.write(struct.pack("<f", value)) | |
| elif kv_type in (GGUFValueType.UINT64, GGUFValueType.INT64): | |
| fout.write(struct.pack("<Q", value)) | |
| elif kv_type == GGUFValueType.FLOAT64: | |
| fout.write(struct.pack("<d", value)) | |
| def write_array_value(fout, sub_type, arr): | |
| """Write an array KV value to the output file.""" | |
| fout.write(struct.pack("<I", int(sub_type))) | |
| fout.write(struct.pack("<Q", len(arr))) | |
| for elem in arr: | |
| if sub_type == GGUFValueType.STRING: | |
| elem_bytes = elem.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(elem_bytes))) | |
| fout.write(elem_bytes) | |
| elif sub_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL): | |
| fout.write(struct.pack("<B", elem)) | |
| elif sub_type in (GGUFValueType.UINT16, GGUFValueType.INT16): | |
| fout.write(struct.pack("<H", elem)) | |
| elif sub_type in (GGUFValueType.UINT32, GGUFValueType.INT32): | |
| fout.write(struct.pack("<I", elem)) | |
| elif sub_type == GGUFValueType.FLOAT32: | |
| fout.write(struct.pack("<f", elem)) | |
| elif sub_type in (GGUFValueType.UINT64, GGUFValueType.INT64): | |
| fout.write(struct.pack("<Q", elem)) | |
| elif sub_type == GGUFValueType.FLOAT64: | |
| fout.write(struct.pack("<d", elem)) | |
| def main() -> None: | |
| if len(sys.argv) != 4: | |
| print( | |
| f"Usage: {sys.argv[0]} <target.gguf> <source.gguf> <output.gguf>", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| target_path, source_path, output_path = sys.argv[1], sys.argv[2], sys.argv[3] | |
| # ------------------------------------------------------------------ | |
| # 1. Open both files | |
| # ------------------------------------------------------------------ | |
| print(f"Reading target: {target_path}") | |
| target_reader = GGUFReader(target_path) | |
| print(f"Reading source: {source_path}") | |
| source_reader = GGUFReader(source_path) | |
| target_file_size = Path(target_path).stat().st_size | |
| source_file_size = Path(source_path).stat().st_size | |
| print( | |
| f" Target tensors: {len(target_reader.tensors)}, KVs: {len([k for k in target_reader.fields if not k.startswith('GGUF.')])}" | |
| ) | |
| print( | |
| f" Source tensors: {len(source_reader.tensors)}, KVs: {len([k for k in source_reader.fields if not k.startswith('GGUF.')])}" | |
| ) | |
| # ------------------------------------------------------------------ | |
| # 2. Read architecture and MTP metadata from source | |
| # ------------------------------------------------------------------ | |
| arch = get_field_value(target_reader, "general.architecture") | |
| if arch is None: | |
| print("ERROR: Target GGUF has no general.architecture key") | |
| sys.exit(1) | |
| source_block_count = get_field_value(source_reader, f"{arch}.block_count") | |
| source_nextn = get_field_value(source_reader, f"{arch}.nextn_predict_layers") | |
| if source_nextn is None: | |
| print("ERROR: Source GGUF has no nextn_predict_layers key") | |
| sys.exit(1) | |
| target_block_count = get_field_value(target_reader, f"{arch}.block_count") | |
| print(f"\n Arch: {arch}") | |
| print(f" Target block_count: {target_block_count}") | |
| print( | |
| f" Source block_count: {source_block_count}, nextn_predict_layers: {source_nextn}" | |
| ) | |
| # Identify extra tensors in the source (blocks beyond target's count) | |
| source_extra = [ | |
| t | |
| for t in source_reader.tensors | |
| if t.name.startswith(f"blk.{target_block_count}.") | |
| ] | |
| print(f"\n Extra tensors to transplant: {len(source_extra)}") | |
| if not source_extra: | |
| print( | |
| f"ERROR: No tensors found with prefix 'blk.{target_block_count}.' in source" | |
| ) | |
| sys.exit(1) | |
| # ------------------------------------------------------------------ | |
| # 3. Prepare tensor lists and calculate sizes | |
| # ------------------------------------------------------------------ | |
| # Combine tensors: all from target + extra from source | |
| all_tensors = list(target_reader.tensors) + source_extra | |
| # Calculate on-disk sizes for source tensors (including per-row metadata) | |
| target_on_disk_sizes = calculate_on_disk_sizes( | |
| target_reader.tensors, target_file_size | |
| ) | |
| source_on_disk_sizes = calculate_on_disk_sizes( | |
| source_reader.tensors, source_file_size | |
| ) | |
| # Create mapping for source tensors | |
| source_tensor_map = { | |
| t.name: (t, size) | |
| for t, size in zip(source_reader.tensors, source_on_disk_sizes) | |
| } | |
| # ------------------------------------------------------------------ | |
| # 4. Write output file | |
| # ------------------------------------------------------------------ | |
| print(f"\nWriting output: {output_path}") | |
| with ( | |
| open(target_path, "rb") as target_fin, | |
| open(source_path, "rb") as source_fin, | |
| open(output_path, "wb") as fout, | |
| ): | |
| # 4.1 Write header | |
| # Magic (4 bytes) | |
| fout.write(b"GGUF") | |
| # Version (4 bytes) | |
| fout.write(struct.pack("<I", 3)) | |
| # Tensor count (8 bytes) | |
| fout.write(struct.pack("<Q", len(all_tensors))) | |
| # Calculate KV count | |
| kv_count = len( | |
| [k for k in target_reader.fields.keys() if not k.startswith("GGUF.")] | |
| ) | |
| kv_count += 1 # block_count override | |
| # Add source-only KVs (excluding block_count and nextn_predict_layers) | |
| for key in source_reader.fields: | |
| if ( | |
| not key.startswith("GGUF.") | |
| and key not in target_reader.fields | |
| and key != f"{arch}.block_count" | |
| and key != f"{arch}.nextn_predict_layers" | |
| ): | |
| kv_count += 1 | |
| # KV count (8 bytes) | |
| fout.write(struct.pack("<Q", kv_count)) | |
| # 4.2 Write KV data from target (with block_count override) | |
| written_keys = set() | |
| for key, field in target_reader.fields.items(): | |
| if key.startswith("GGUF."): | |
| continue | |
| # Skip block_count (we'll override it) | |
| if key == f"{arch}.block_count": | |
| continue | |
| # Write key | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| # Write type | |
| kv_type = field.types[0] | |
| fout.write(struct.pack("<I", int(kv_type))) | |
| # Write value | |
| if kv_type == GGUFValueType.STRING: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| sub_type = ( | |
| field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32 | |
| ) | |
| write_array_value(fout, sub_type, field.contents()) | |
| else: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| written_keys.add(key) | |
| # Add block_count from source | |
| key = f"{arch}.block_count" | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| fout.write(struct.pack("<I", int(GGUFValueType.UINT32))) | |
| fout.write(struct.pack("<I", source_block_count)) | |
| written_keys.add(key) | |
| # Add nextn_predict_layers from source | |
| key = f"{arch}.nextn_predict_layers" | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| fout.write(struct.pack("<I", int(GGUFValueType.UINT32))) | |
| fout.write(struct.pack("<I", source_nextn)) | |
| written_keys.add(key) | |
| # Copy source-only KVs | |
| for key, field in source_reader.fields.items(): | |
| if ( | |
| key.startswith("GGUF.") | |
| or key in written_keys | |
| or key == f"{arch}.nextn_predict_layers" | |
| ): | |
| continue | |
| # Write key | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| # Write type | |
| kv_type = field.types[0] | |
| fout.write(struct.pack("<I", int(kv_type))) | |
| # Write value | |
| if kv_type == GGUFValueType.STRING: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| sub_type = ( | |
| field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32 | |
| ) | |
| write_array_value(fout, sub_type, field.contents()) | |
| else: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| # 4.3 Write tensor info | |
| # Calculate offsets for all tensors | |
| current_offset = 0 | |
| tensor_offsets = [] | |
| for i, tensor in enumerate(all_tensors): | |
| if i < len(target_reader.tensors): | |
| size = target_on_disk_sizes[i] | |
| else: | |
| _, size = source_tensor_map[tensor.name] | |
| tensor_offsets.append(current_offset) | |
| current_offset += size | |
| # Write tensor info for each tensor | |
| for i, tensor in enumerate(all_tensors): | |
| # Tensor name | |
| name_bytes = tensor.name.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(name_bytes))) | |
| fout.write(name_bytes) | |
| # Dimensions (in GGUF file order: fastest-varying first) | |
| shape = tensor.shape.tolist() | |
| fout.write(struct.pack("<I", len(shape))) | |
| for dim in shape: | |
| fout.write(struct.pack("<Q", dim)) | |
| # Quantization type | |
| fout.write(struct.pack("<I", int(tensor.tensor_type))) | |
| # Offset | |
| fout.write(struct.pack("<Q", tensor_offsets[i])) | |
| # 4.4 Pad to alignment if needed | |
| current_pos = fout.tell() | |
| alignment = get_field_value(target_reader, "general.alignment") or 32 | |
| padding_needed = (alignment - (current_pos % alignment)) % alignment | |
| if padding_needed: | |
| fout.write(b"\x00" * padding_needed) | |
| # 4.5 Copy tensor data | |
| print(f"Copying {len(all_tensors)} tensors...") | |
| for i, tensor in enumerate(all_tensors): | |
| if i < len(target_reader.tensors): | |
| # Target tensor | |
| offset = target_reader.tensors[i].data_offset | |
| size = target_on_disk_sizes[i] | |
| fin = target_fin | |
| else: | |
| # Source extra tensor | |
| src_tensor, size = source_tensor_map[tensor.name] | |
| offset = src_tensor.data_offset | |
| fin = source_fin | |
| fin.seek(offset) | |
| raw_data = fin.read(size) | |
| fout.write(raw_data) | |
| if (i + 1) % 50 == 0 or i == len(all_tensors) - 1: | |
| print(f" Copied {i + 1}/{len(all_tensors)} tensors") | |
| # ------------------------------------------------------------------ | |
| # 5. Verify output | |
| # ------------------------------------------------------------------ | |
| output_size = Path(output_path).stat().st_size | |
| print(f"\nOutput: {output_path}") | |
| print(f" Size: {output_size / 1_000_000_000:.2f} GB") | |
| print(f" Tensors: {len(all_tensors)}") | |
| # Validate | |
| print("\nValidating output...") | |
| errors = [] | |
| try: | |
| out_reader = GGUFReader(output_path) | |
| # Check block_count | |
| out_block_count = get_field_value(out_reader, f"{arch}.block_count") | |
| if out_block_count != source_block_count: | |
| errors.append( | |
| f"block_count: expected {source_block_count}, got {out_block_count}" | |
| ) | |
| # Check nextn_predict_layers | |
| out_nextn = get_field_value(out_reader, f"{arch}.nextn_predict_layers") | |
| if out_nextn != source_nextn: | |
| errors.append( | |
| f"nextn_predict_layers: expected {source_nextn}, got {out_nextn}" | |
| ) | |
| # Check extra tensors exist | |
| out_tensor_names = {t.name for t in out_reader.tensors} | |
| for tensor in source_extra: | |
| if tensor.name not in out_tensor_names: | |
| errors.append(f"Missing tensor: {tensor.name}") | |
| # Spot-check tensor data integrity | |
| print(" Spot-checking tensor data integrity...") | |
| out_tensors = {t.name: t for t in out_reader.tensors} | |
| # Check a target tensor | |
| for name in ["token_embd.weight"]: | |
| if name in out_tensors and name in {t.name for t in target_reader.tensors}: | |
| target_t = next( | |
| (t for t in target_reader.tensors if t.name == name), None | |
| ) | |
| out_t = out_tensors.get(name) | |
| if target_t and out_t: | |
| target_hash = hashlib.sha256(target_t.data.tobytes()).hexdigest()[ | |
| :16 | |
| ] | |
| out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16] | |
| if target_hash == out_hash: | |
| print(f" {name}: OK ({out_hash})") | |
| else: | |
| errors.append(f"Data mismatch: {name}") | |
| # Check an extra tensor | |
| if source_extra: | |
| extra_name = source_extra[0].name | |
| source_t = source_tensor_map[extra_name][0] | |
| out_t = out_tensors.get(extra_name) | |
| if out_t: | |
| source_hash = hashlib.sha256(source_t.data.tobytes()).hexdigest()[:16] | |
| out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16] | |
| if source_hash == out_hash: | |
| print(f" {extra_name}: OK ({out_hash})") | |
| else: | |
| errors.append(f"Data mismatch: {extra_name}") | |
| except Exception as e: | |
| errors.append(f"Failed to read output: {e}") | |
| if errors: | |
| print("\nVALIDATION FAILED:") | |
| for err in errors: | |
| print(f" - {err}") | |
| sys.exit(1) | |
| else: | |
| print(" OK — all checks passed") | |
| print(f"\nDone. Output: {output_path}") | |
| if __name__ == "__main__": | |
| main() |
