cd /news/large-language-models/transplant-mtp-block-from-one-gguf-f… · home topics large-language-models article
[ARTICLE · art-15843] src=gist.github.com pub= topic=large-language-models verified=true sentiment=· neutral

Transplant MTP block from one GGUF file into another

A developer has released a Python script that transplants extra tensors—such as Multi-Token Prediction (MTP) layers—from one GGUF file into another, enabling the creation of mixed-quantization models. The tool preserves the exact on-disk layout, including per-row metadata critical for GPU inference, and supports using smaller "tensors-only" donor files to save bandwidth. Example usage includes transplanting an MTP block from a Q8_0 quantized file into an IQ4_KS base model.

read12 min publishedMay 1, 2026

| #!/usr/bin/env python3 | | | """ | | | Transplant extra tensors (e.g. MTP layers) from one GGUF file into another, | | | producing a mixed-quantization GGUF. | | | Note: Tested with ik_llama.cpp GGUF Python module. | | | Usage: | | | python convert.py <target.gguf> <source.gguf> <output.gguf> | | | Arguments: | | | target — base GGUF (tensors + metadata kept as-is) | | | source — GGUF with extra blocks to transplant (e.g. blk.64.* for MTP) | | | output — resulting mixed-quantization GGUF | | | The script preserves the exact on-disk layout including per-row metadata | | | for quantization types like IQ4_KS that have row_meta_size > 0. This is | | | critical for GPU inference to work correctly. | | | Donor Models: | | | To save bandwidth, you can use 'tensors-only' GGUFs as the source file. | | | Credits to AzerbaijanNyan for the extraction: | | | https://www.reddit.com/r/LocalLLaMA/comments/1t6r1ny/extracted_mtp_tensor_ggufs_smaller_donor_models/ | | | Available donors: | |

| - Qwen3.6-35A3B: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-35A3B-MTP-TENSORS-ONLY | |
| - Qwen3.6-27b: https://huggingface.co/IHaveNoClueAndIMustPost/Qwen3.6-27b-MTP-TENSORS-ONLY | |

| Example: | | | # Transplant MTP block from Q8_0 into IQ4_KS base model | | | python convert.py Qwen3.6-27B-IQ4_KS.gguf Qwen3.6-27B-MTP-Q8_0.gguf Qwen3.6-27B-MTP-IQ4_KS.gguf | | | """ | | | import hashlib | | | import sys | | | import struct | | | from pathlib import Path | | | from gguf import GGUFReader, GGUFValueType | | | def get_field_value(reader: GGUFReader, key: str): | | | """Safely get a field value from GGUFReader.""" | | | field = reader.get_field(key) | | | return field.contents() if field else None | | | def calculate_on_disk_sizes(tensors, file_size): | | | """Calculate on-disk size for each tensor (including per-row metadata/padding).""" | |

| n_tensors = len(tensors) | |
| sizes = [] | |
| for i in range(n_tensors): | |
| if i < n_tensors - 1: | |
| sizes.append(tensors[i + 1].data_offset - tensors[i].data_offset) | |

| else: | | | sizes.append(file_size - tensors[i].data_offset) | | | return sizes | | | def write_kv_value(fout, kv_type, value): | | | """Write a KV value to the output file.""" | |

| if kv_type == GGUFValueType.STRING: | |
| value_bytes = value.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(value_bytes))) | |
| fout.write(value_bytes) | |
| elif kv_type == GGUFValueType.ARRAY: | |

| # This is handled separately in the main code | | | pass | | | elif kv_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL): | | | fout.write(struct.pack("<B", value)) | | | elif kv_type in (GGUFValueType.UINT16, GGUFValueType.INT16): | | | fout.write(struct.pack("<H", value)) | | | elif kv_type in (GGUFValueType.UINT32, GGUFValueType.INT32): | |

| fout.write(struct.pack("<I", value)) | |
| elif kv_type == GGUFValueType.FLOAT32: | |
| fout.write(struct.pack("<f", value)) | |

| elif kv_type in (GGUFValueType.UINT64, GGUFValueType.INT64): | |

| fout.write(struct.pack("<Q", value)) | |
| elif kv_type == GGUFValueType.FLOAT64: | |
| fout.write(struct.pack("<d", value)) | |
| def write_array_value(fout, sub_type, arr): | |

| """Write an array KV value to the output file.""" | |

| fout.write(struct.pack("<I", int(sub_type))) | |
| fout.write(struct.pack("<Q", len(arr))) | |

| for elem in arr: | |

| if sub_type == GGUFValueType.STRING: | |
| elem_bytes = elem.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(elem_bytes))) | |
| fout.write(elem_bytes) | |

| elif sub_type in (GGUFValueType.UINT8, GGUFValueType.INT8, GGUFValueType.BOOL): | | | fout.write(struct.pack("<B", elem)) | | | elif sub_type in (GGUFValueType.UINT16, GGUFValueType.INT16): | | | fout.write(struct.pack("<H", elem)) | | | elif sub_type in (GGUFValueType.UINT32, GGUFValueType.INT32): | |

| fout.write(struct.pack("<I", elem)) | |
| elif sub_type == GGUFValueType.FLOAT32: | |
| fout.write(struct.pack("<f", elem)) | |

| elif sub_type in (GGUFValueType.UINT64, GGUFValueType.INT64): | |

| fout.write(struct.pack("<Q", elem)) | |
| elif sub_type == GGUFValueType.FLOAT64: | |
| fout.write(struct.pack("<d", elem)) | |
| def main() -> None: | |
| if len(sys.argv) != 4: | |

| print( | | | f"Usage: {sys.argv[0]} <target.gguf> <source.gguf> <output.gguf>", | | | file=sys.stderr, | | | ) | |

| sys.exit(1) | |
| target_path, source_path, output_path = sys.argv[1], sys.argv[2], sys.argv[3] | |
| # ------------------------------------------------------------------ | |

| # 1. Open both files | |

| # ------------------------------------------------------------------ | |
| print(f"Reading target: {target_path}") | |
| target_reader = GGUFReader(target_path) | |
| print(f"Reading source: {source_path}") | |
| source_reader = GGUFReader(source_path) | |
| target_file_size = Path(target_path).stat().st_size | |
| source_file_size = Path(source_path).stat().st_size | |

| print( | | | f" Target tensors: {len(target_reader.tensors)}, KVs: {len([k for k in target_reader.fields if not k.startswith('GGUF.')])}" | | | ) | | | print( | | | f" Source tensors: {len(source_reader.tensors)}, KVs: {len([k for k in source_reader.fields if not k.startswith('GGUF.')])}" | | | ) | | | # ------------------------------------------------------------------ | | | # 2. Read architecture and MTP metadata from source | | | # ------------------------------------------------------------------ | | | arch = get_field_value(target_reader, "general.architecture") | | | if arch is None: | | | print("ERROR: Target GGUF has no general.architecture key") | |

| sys.exit(1) | |
| source_block_count = get_field_value(source_reader, f"{arch}.block_count") | |
| source_nextn = get_field_value(source_reader, f"{arch}.nextn_predict_layers") | |

| if source_nextn is None: | | | print("ERROR: Source GGUF has no nextn_predict_layers key") | |

| sys.exit(1) | |
| target_block_count = get_field_value(target_reader, f"{arch}.block_count") | |
| print(f"\n Arch: {arch}") | |
| print(f" Target block_count: {target_block_count}") | |

| print( | | | f" Source block_count: {source_block_count}, nextn_predict_layers: {source_nextn}" | | | ) | | | # Identify extra tensors in the source (blocks beyond target's count) | | | source_extra = [ | | | t | | | for t in source_reader.tensors | | | if t.name.startswith(f"blk.{target_block_count}.") | | | ] | | | print(f"\n Extra tensors to transplant: {len(source_extra)}") | | | if not source_extra: | | | print( | | | f"ERROR: No tensors found with prefix 'blk.{target_block_count}.' in source" | | | ) | |

| sys.exit(1) | |
| # ------------------------------------------------------------------ | |

| # 3. Prepare tensor lists and calculate sizes | | | # ------------------------------------------------------------------ | | | # Combine tensors: all from target + extra from source | | | all_tensors = list(target_reader.tensors) + source_extra | | | # Calculate on-disk sizes for source tensors (including per-row metadata) | | | target_on_disk_sizes = calculate_on_disk_sizes( | | | target_reader.tensors, target_file_size | | | ) | | | source_on_disk_sizes = calculate_on_disk_sizes( | | | source_reader.tensors, source_file_size | | | ) | | | # Create mapping for source tensors | |

| source_tensor_map = { | |
| t.name: (t, size) | |

| for t, size in zip(source_reader.tensors, source_on_disk_sizes) | | | } | | | # ------------------------------------------------------------------ | | | # 4. Write output file | |

| # ------------------------------------------------------------------ | |
| print(f"\nWriting output: {output_path}") | |

| with ( | | | open(target_path, "rb") as target_fin, | | | open(source_path, "rb") as source_fin, | |

| open(output_path, "wb") as fout, | |
| ): | |

| # 4.1 Write header | |

| # Magic (4 bytes) | |
| fout.write(b"GGUF") | |
| # Version (4 bytes) | |
| fout.write(struct.pack("<I", 3)) | |
| # Tensor count (8 bytes) | |
| fout.write(struct.pack("<Q", len(all_tensors))) | |

| # Calculate KV count | |

| kv_count = len( | |
| [k for k in target_reader.fields.keys() if not k.startswith("GGUF.")] | |

| ) | | | kv_count += 1 # block_count override | | | # Add source-only KVs (excluding block_count and nextn_predict_layers) | | | for key in source_reader.fields: | | | if ( | | | not key.startswith("GGUF.") | | | and key not in target_reader.fields | |

| and key != f"{arch}.block_count" | |
| and key != f"{arch}.nextn_predict_layers" | |
| ): | |

| kv_count += 1 | |

| # KV count (8 bytes) | |
| fout.write(struct.pack("<Q", kv_count)) | |

| # 4.2 Write KV data from target (with block_count override) | |

| written_keys = set() | |
| for key, field in target_reader.fields.items(): | |
| if key.startswith("GGUF."): | |

| continue | | | # Skip block_count (we'll override it) | | | if key == f"{arch}.block_count": | | | continue | | | # Write key | |

| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |

| # Write type | |

| kv_type = field.types[0] | |
| fout.write(struct.pack("<I", int(kv_type))) | |

| # Write value | |

| if kv_type == GGUFValueType.STRING: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| sub_type = ( | |
| field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32 | |

| ) | | | write_array_value(fout, sub_type, field.contents()) | | | else: | |

| write_kv_value(fout, kv_type, field.contents()) | |
| written_keys.add(key) | |

| # Add block_count from source | |

| key = f"{arch}.block_count" | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| fout.write(struct.pack("<I", int(GGUFValueType.UINT32))) | |
| fout.write(struct.pack("<I", source_block_count)) | |
| written_keys.add(key) | |

| # Add nextn_predict_layers from source | |

| key = f"{arch}.nextn_predict_layers" | |
| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |
| fout.write(struct.pack("<I", int(GGUFValueType.UINT32))) | |
| fout.write(struct.pack("<I", source_nextn)) | |
| written_keys.add(key) | |

| # Copy source-only KVs | | | for key, field in source_reader.fields.items(): | | | if ( | | | key.startswith("GGUF.") | | | or key in written_keys | |

| or key == f"{arch}.nextn_predict_layers" | |
| ): | |

| continue | | | # Write key | |

| key_bytes = key.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(key_bytes))) | |
| fout.write(key_bytes) | |

| # Write type | |

| kv_type = field.types[0] | |
| fout.write(struct.pack("<I", int(kv_type))) | |

| # Write value | |

| if kv_type == GGUFValueType.STRING: | |
| write_kv_value(fout, kv_type, field.contents()) | |
| elif kv_type == GGUFValueType.ARRAY: | |
| sub_type = ( | |
| field.types[1] if len(field.types) > 1 else GGUFValueType.FLOAT32 | |

| ) | | | write_array_value(fout, sub_type, field.contents()) | | | else: | | | write_kv_value(fout, kv_type, field.contents()) | | | # 4.3 Write tensor info | | | # Calculate offsets for all tensors | | | current_offset = 0 | |

| tensor_offsets = [] | |
| for i, tensor in enumerate(all_tensors): | |
| if i < len(target_reader.tensors): | |
| size = target_on_disk_sizes[i] | |

| else: | | | _, size = source_tensor_map[tensor.name] | | | tensor_offsets.append(current_offset) | | | current_offset += size | | | # Write tensor info for each tensor | | | for i, tensor in enumerate(all_tensors): | | | # Tensor name | |

| name_bytes = tensor.name.encode("utf-8") | |
| fout.write(struct.pack("<Q", len(name_bytes))) | |
| fout.write(name_bytes) | |
| # Dimensions (in GGUF file order: fastest-varying first) | |
| shape = tensor.shape.tolist() | |
| fout.write(struct.pack("<I", len(shape))) | |

| for dim in shape: | | | fout.write(struct.pack("<Q", dim)) | | | # Quantization type | | | fout.write(struct.pack("<I", int(tensor.tensor_type))) | | | # Offset | | | fout.write(struct.pack("<Q", tensor_offsets[i])) | | | # 4.4 Pad to alignment if needed | | | current_pos = fout.tell() | | | alignment = get_field_value(target_reader, "general.alignment") or 32 | | | padding_needed = (alignment - (current_pos % alignment)) % alignment | | | if padding_needed: | | | fout.write(b"\x00" * padding_needed) | | | # 4.5 Copy tensor data | |

| print(f"Copying {len(all_tensors)} tensors...") | |
| for i, tensor in enumerate(all_tensors): | |
| if i < len(target_reader.tensors): | |

| # Target tensor | |

| offset = target_reader.tensors[i].data_offset | |
| size = target_on_disk_sizes[i] | |

| fin = target_fin | | | else: | | | # Source extra tensor | | | src_tensor, size = source_tensor_map[tensor.name] | | | offset = src_tensor.data_offset | | | fin = source_fin | |

| fin.seek(offset) | |
| raw_data = fin.read(size) | |
| fout.write(raw_data) | |
| if (i + 1) % 50 == 0 or i == len(all_tensors) - 1: | |
| print(f" Copied {i + 1}/{len(all_tensors)} tensors") | |
| # ------------------------------------------------------------------ | |

| # 5. Verify output | |

| # ------------------------------------------------------------------ | |
| output_size = Path(output_path).stat().st_size | |
| print(f"\nOutput: {output_path}") | |
| print(f" Size: {output_size / 1_000_000_000:.2f} GB") | |
| print(f" Tensors: {len(all_tensors)}") | |

| # Validate | |

| print("\nValidating output...") | |
| errors = [] | |

| try: | | | out_reader = GGUFReader(output_path) | | | # Check block_count | | | out_block_count = get_field_value(out_reader, f"{arch}.block_count") | | | if out_block_count != source_block_count: | | | errors.append( | | | f"block_count: expected {source_block_count}, got {out_block_count}" | | | ) | | | # Check nextn_predict_layers | |

| out_nextn = get_field_value(out_reader, f"{arch}.nextn_predict_layers") | |
| if out_nextn != source_nextn: | |

| errors.append( | | | f"nextn_predict_layers: expected {source_nextn}, got {out_nextn}" | | | ) | | | # Check extra tensors exist | | | out_tensor_names = {t.name for t in out_reader.tensors} | | | for tensor in source_extra: | | | if tensor.name not in out_tensor_names: | | | errors.append(f"Missing tensor: {tensor.name}") | | | # Spot-check tensor data integrity | |

| print(" Spot-checking tensor data integrity...") | |
| out_tensors = {t.name: t for t in out_reader.tensors} | |

| # Check a target tensor | | | for name in ["token_embd.weight"]: | | | if name in out_tensors and name in {t.name for t in target_reader.tensors}: | |

| target_t = next( | |
| (t for t in target_reader.tensors if t.name == name), None | |

| ) | | | out_t = out_tensors.get(name) | | | if target_t and out_t: | | | target_hash = hashlib.sha256(target_t.data.tobytes()).hexdigest()[ | | | :16 | | | ] | |

| out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16] | |
| if target_hash == out_hash: | |
| print(f" {name}: OK ({out_hash})") | |

| else: | | | errors.append(f"Data mismatch: {name}") | | | # Check an extra tensor | | | if source_extra: | |

| extra_name = source_extra[0].name | |
| source_t = source_tensor_map[extra_name][0] | |
| out_t = out_tensors.get(extra_name) | |

| if out_t: | |

| source_hash = hashlib.sha256(source_t.data.tobytes()).hexdigest()[:16] | |
| out_hash = hashlib.sha256(out_t.data.tobytes()).hexdigest()[:16] | |
| if source_hash == out_hash: | |
| print(f" {extra_name}: OK ({out_hash})") | |

| else: | | | errors.append(f"Data mismatch: {extra_name}") | | | except Exception as e: | | | errors.append(f"Failed to read output: {e}") | | | if errors: | | | print("\nVALIDATION FAILED:") | | | for err in errors: | |

| print(f" - {err}") | |
| sys.exit(1) | |

| else: | |

| print(" OK — all checks passed") | |
| print(f"\nDone. Output: {output_path}") | |
| if __name__ == "__main__": | |
| main() |
── more in #large-language-models 4 stories · sorted by recency
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain — perfect for shipping the agent you just read about.

$git push zahid main
Live at https://your-agent.zahid.host
Get free account → Pricing
from €0/mo · no card required
LIVE [news/transplant-mtp-block…] indexed:0 read:12min 2026-05-01 ·