# OpenAI-compatible proxy for DeepSeek V4 Flash with intelligent auto context compression features

> Source: <https://gist.github.com/g023/c2bb7b540ffe64cee76023f18f6f9365>
> Published: 2026-04-24 18:16:03+00:00

#!/usr/bin/env python3
"""
Zero-dependency OpenAI-compatible proxy for DeepSeek V4 Flash.

Author: g023
License: MIT

All client‑supplied model and generation parameters are **ignored**.
The proxy always uses the model, max output tokens, and other settings
defined in the global configuration (see --help and the constants below).

Optimisations:
  - System prompt compression (auto-summarized via DeepSeek API;
    originals stored in ./pre_sys/, summaries cached in ./post_sys/)
  - Markdown block deduplication (keeps only the latest occurrence full)
  - Conversation summarisation triggers when token budget is exceeded
  - Assistant reasoning is cached to avoid redundant re‑generation
  - Inter‑message content fingerprinting & deduplication (Feature F-1)
    - Removes repeated boilerplate segments (environment_info, userMemory,
      reminderInstructions, etc.) from user messages across conversation turns.
    - Segments are hashed (SHA‑256), duplicates replaced with an empty string
      (or a minimal placeholder if the message becomes empty).
    - Per‑conversation fingerprint storage with LRU eviction.

* Reads from local file K.dat for API key if DEEPSEEK_API_KEY env var is not set.

* just a proof of concept pet project. Do not expose this server to the internet.
"""

import argparse
import collections
import copy
import hashlib
import http.server
import json
import logging
import os
import re
import signal
import socketserver
import sys
import threading
import time
import urllib.error
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

# ==============================================================================
# Global configuration – these forcibly override every client request
# ==============================================================================
DEEPSEEK_BASE = "https://api.deepseek.com"
DEFAULT_MODEL  = "deepseek-v4-flash" # "deepseek-v4-flash" "deepseek-v4-pro"          # model that will *always* be used
MAX_CACHE_SIZE = 500                           # LRU cache for assistant reasoning
MAX_CONTEXT    = 128000                        # tokens (context size)
SUMMARY_RATIO  = 0.8                           # trigger summarisation at 80 % of MAX_CONTEXT
SUMMARY_MODEL  = DEFAULT_MODEL                 # model used for the summarisation call
MAX_OUTPUT_TOKENS = 128000                       # max tokens to generate (overrides client)
THINKING_MODE  = "auto"                        # "enabled", "disabled", or "auto" (default)

# --------------------------------------------------------------------------
# Local file save toggles – set to False to disable disk writes
# --------------------------------------------------------------------------
SAVE_PREPOST_MSGS   = False   # save pre/post message dumps to ./pre_msg/ and ./post_msg/
SAVE_PREPOST_SYSTEM = True   # save original/summarized system prompts to ./pre_sys/ and ./post_sys/

# --------------------------------------------------------------------------
# Retry configuration for summarisation calls
# --------------------------------------------------------------------------
SUMMARISE_MAX_RETRIES = 3
SUMMARISE_RETRY_BASE_SLEEP = 2.0  # seconds, doubled each attempt

# --------------------------------------------------------------------------
# Feature F-1: Inter‑message content fingerprinting & deduplication
# --------------------------------------------------------------------------
MAX_FINGERPRINT_HISTORY = 100          # max number of segments stored per conversation

# Known boilerplate XML tags – each as (open_tag, close_tag)
_BOILERPLATE_PATTERNS = {
    "environment_info":           ("<environment_info>", "</environment_info>"),
    "workspace_info":             ("<workspace_info>", "</workspace_info>"),
    "userMemory":                 ("<userMemory>", "</userMemory>"),
    "sessionMemory":              ("<sessionMemory>", "</sessionMemory>"),
    "repoMemory":                 ("<repoMemory>", "</repoMemory>"),
    "context":                    ("<context>", "</context>"),
    "reminderInstructions":       ("<reminderInstructions>", "</reminderInstructions>"),
    "additional_skills_reminder": ("<additional_skills_reminder>", "</additional_skills_reminder>"),
    "editorContext":              ("<editorContext>", "</editorContext>"),
}

# Fingerprint storage: conv_id -> { segment_hash: (first_index, full_segment_text) }
_segment_fingerprints: Dict[str, Dict[str, Tuple[int, str]]] = {}
_segment_fp_lock = threading.Lock()

# Read the DeepSeek API key
DSEEK_KEY = ""
if os.environ.get("DEEPSEEK_API_KEY"):
    DSEEK_KEY = os.environ["DEEPSEEK_API_KEY"]
else:
    try:
        with open("K.dat", "r") as f:
            DSEEK_KEY = f.read().strip()
    except Exception:
        print("ERROR: DEEPSEEK_API_KEY environment variable not set and K.dat not found.",
              file=sys.stderr)
        sys.exit(1)

# ==============================================================================
# Logging / dev feedback – directories for pre/post messages
# ==============================================================================
PRE_MSG_DIR = Path("./pre_msg")
POST_MSG_DIR = Path("./post_msg")

def _save_json_message(dir_path: Path, prefix: str, messages: list, msg_hash: str):
    """Save a message list as a tab-indented JSON file."""
    try:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        filename = f"{timestamp}_{msg_hash}.json"
        filepath = dir_path / filename
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(messages, f, ensure_ascii=False, indent="\t")
    except Exception:
        logging.exception(f"Failed to save {prefix} message dump")

def _md5_of_messages(messages: list) -> str:
    """Short (8-char) MD5 hash of the messages list (stable)."""
    data = json.dumps(messages, sort_keys=True, ensure_ascii=False).encode()
    return hashlib.md5(data).hexdigest()[:8]

# ==============================================================================
# Improved token estimation – CJK‑aware heuristic
# ==============================================================================
_CJK_RANGES = [
    (0x4E00, 0x9FFF),   # CJK Unified Ideographs
    (0x3400, 0x4DBF),   # CJK Unified Ideographs Extension A
    (0x20000, 0x2A6DF), # Extension B
    (0x2A700, 0x2B73F), # Extension C
    (0x2B740, 0x2B81F), # Extension D
    (0x2B820, 0x2CEAF), # Extension E
    (0x2CEB0, 0x2EBEF), # Extension F
    (0x30000, 0x3134F), # Extension G
    (0x31350, 0x323AF), # Extension H
]
_CJK_PUNCT = {0x3000, 0x3001, 0x3002, 0xFF0C, 0xFF0E, 0xFF1A, 0xFF1B, 0xFF01, 0xFF1F,
              0x300C, 0x300D, 0x300E, 0x300F, 0x3010, 0x3011, 0x300A, 0x300B}

def _is_cjk(cp: int) -> bool:
    if cp in _CJK_PUNCT:
        return True
    for lo, hi in _CJK_RANGES:
        if lo <= cp <= hi:
            return True
    return False

def estimate_tokens(text: str) -> int:
    """
    Token count heuristic:
      - For CJK‑heavy text (>50% CJK characters): 1 token per character.
      - Otherwise: 1 token per 3.5 characters (conservative for code/English).
    """
    total = len(text)
    if total == 0:
        return 0
    cjk_count = sum(1 for ch in text if _is_cjk(ord(ch)))
    if cjk_count / total > 0.5:
        # Mostly CJK: each character is roughly one token
        return max(1, total)
    else:
        return max(1, int(total // 3.5))

# ==============================================================================
# LRU Cache for assistant reasoning
# ==============================================================================
class LRUCache:
    def __init__(self, maxsize: int):
        self.maxsize = maxsize
        self._cache = collections.OrderedDict()
        self._lock = threading.Lock()

    def get(self, key: str):
        with self._lock:
            if key in self._cache:
                self._cache.move_to_end(key)
                return self._cache[key]
        return None

    def set(self, key: str, value: dict):
        with self._lock:
            if key in self._cache:
                self._cache.move_to_end(key)
            self._cache[key] = value
            if len(self._cache) > self.maxsize:
                self._cache.popitem(last=False)

_assistant_cache = LRUCache(MAX_CACHE_SIZE)

# ==============================================================================
# Hashing utilities
# ==============================================================================
def _stable_hash(obj: Any) -> str:
    """Stable SHA256 hash of a JSON‑serialisable object (dict or list)."""
    return hashlib.sha256(
        json.dumps(obj, sort_keys=True, ensure_ascii=False).encode()
    ).hexdigest()

def _conv_hash(messages: List[dict]) -> str:
    """Hash of a message list – only fields that affect the conversation identity."""
    important_keys = {"role", "content", "tool_calls", "name", "tool_call_id"}
    cleaned = [{k: v for k, v in m.items() if k in important_keys} for m in messages]
    return _stable_hash(cleaned)

# ==============================================================================
# System prompt compression – auto-summarize via DeepSeek API
# ==============================================================================
PRE_SYS_DIR = Path("./pre_sys")
POST_SYS_DIR = Path("./post_sys")
_sys_lock = threading.Lock()

def _ensure_sys_dirs():
    PRE_SYS_DIR.mkdir(parents=True, exist_ok=True)
    POST_SYS_DIR.mkdir(parents=True, exist_ok=True)

def _sys_original_path(sys_hash: str) -> Path:
    return PRE_SYS_DIR / f"{sys_hash}.txt"

def _sys_summary_path(sys_hash: str) -> Path:
    return POST_SYS_DIR / f"{sys_hash}.txt"

def load_summarized_prompt(sys_hash: str) -> Optional[str]:
    """Return the cached summarized prompt if it exists, else None."""
    path = _sys_summary_path(sys_hash)
    if path.exists():
        try:
            return path.read_text(encoding="utf-8")
        except Exception:
            logging.warning(f"Failed to read summarized prompt {sys_hash}")
    return None

def save_original_prompt(sys_hash: str, content: str):
    """Atomically save the original system prompt to disk (thread‑safe)."""
    path = _sys_original_path(sys_hash)
    if path.exists():
        return  # already saved
    tmp_path = path.with_suffix(".tmp")
    with _sys_lock:
        try:
            tmp_path.write_text(content, encoding="utf-8")
            tmp_path.rename(path)
        except Exception as e:
            logging.warning(f"Failed to save original prompt {sys_hash}: {e}")

def save_summarized_prompt(sys_hash: str, content: str):
    """Atomically write a summarized prompt to disk (thread‑safe)."""
    path = _sys_summary_path(sys_hash)
    tmp_path = path.with_suffix(".tmp")
    with _sys_lock:
        try:
            tmp_path.write_text(content, encoding="utf-8")
            tmp_path.rename(path)
        except Exception as e:
            logging.warning(f"Failed to write summarized prompt {sys_hash}: {e}")

def summarize_system_prompt(original: str, api_key: str) -> str:
    """Call DeepSeek to produce a concise summary of a system prompt."""
    summary_prompt = (
        "You are a prompt compression assistant. Summarize the following system prompt "
        "as concisely as possible while preserving ALL critical instructions, constraints, "
        "formatting rules, and behavioral guidelines. Remove redundancy, examples, and "
        "verbose explanations. Output ONLY the compressed prompt — no commentary.\n\n"
        f"{original}"
    )
    payload = {
        "model": SUMMARY_MODEL,
        "messages": [{"role": "user", "content": summary_prompt}],
        "max_tokens": 2000,
        "temperature": 0.0,
        "thinking": {"type": "disabled"},
    }
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }
    last_exc = None
    for attempt in range(1, SUMMARISE_MAX_RETRIES + 1):
        try:
            req = urllib.request.Request(
                f"{DEEPSEEK_BASE}/chat/completions",
                data=json.dumps(payload).encode(),
                headers=headers,
                method="POST",
            )
            with urllib.request.urlopen(req, timeout=120) as resp:
                body = json.loads(resp.read().decode())
                return body["choices"][0]["message"]["content"]
        except Exception as e:
            last_exc = e
            if attempt < SUMMARISE_MAX_RETRIES:
                sleep_time = SUMMARISE_RETRY_BASE_SLEEP * (2 ** (attempt - 1))
                logging.warning("System prompt summarization attempt %d failed, retrying in %.1fs: %s",
                                attempt, sleep_time, e)
                time.sleep(sleep_time)
            else:
                logging.error("System prompt summarization failed after %d attempts", SUMMARISE_MAX_RETRIES)
    raise RuntimeError(f"System prompt summarization failed: {last_exc}")

# ==============================================================================
# Markdown block deduplication (fixed – no index corruption)
# ==============================================================================
_FENCE_PATTERN = re.compile(r"(```|~~~)(\w*)\n(.*?)\1", re.DOTALL)

def deduplicate_markdown_blocks(messages: List[dict]) -> int:
    """
    For each fenced code block, keep the **last** occurrence full; replace
    earlier occurrences with a placeholder. Modifies messages in-place.
    Returns the count of blocks replaced.
    """
    block_info: Dict[str, int] = {}          # hash -> latest global ID
    all_matches: List[Dict[str, Any]] = []   # per-match info
    replaced_count = 0

    global_counter = 0
    for msg_idx, msg in enumerate(messages):
        content = msg.get("content", "")
        if not isinstance(content, str):
            continue
        for match in _FENCE_PATTERN.finditer(content):
            inner_text = match.group(3)
            h = hashlib.sha256(inner_text.encode()).hexdigest()
            all_matches.append({
                "global_id": global_counter,
                "msg_idx": msg_idx,
                "start": match.start(3),
                "end": match.end(3),
                "hash": h,
                "full_text": inner_text,
            })
            block_info[h] = global_counter
            global_counter += 1

    msg_matches: Dict[int, List[dict]] = {}
    for m in all_matches:
        msg_matches.setdefault(m["msg_idx"], []).append(m)

    for msg_idx, matches in msg_matches.items():
        msg = messages[msg_idx]
        original = msg["content"]
        matches_sorted = sorted(matches, key=lambda x: x["start"], reverse=True)
        new_parts = []
        prev_end = len(original)
        for match in matches_sorted:
            start, end = match["start"], match["end"]
            is_last = block_info.get(match["hash"]) == match["global_id"]
            if not is_last:
                replacement = ".. (code omitted, see later version) .."
                replaced_count += 1
            else:
                replacement = match["full_text"]
            new_parts.append(original[end:prev_end])
            new_parts.append(replacement)
            prev_end = start
        new_parts.append(original[:prev_end])
        msg["content"] = "".join(reversed(new_parts))

    return replaced_count

# ==============================================================================
# Feature F-1: Inter-message content fingerprinting & deduplication
# ==============================================================================
def _get_conversation_base_id(messages: List[dict]) -> str:
    """
    Derive a stable conversation identifier from the first system message
    and the first user message (excluding boilerplate tags). This ID persists
    across requests of the same conversation, even as new messages are added.
    """
    # Find first system message (if any)
    sys_content = ""
    for msg in messages:
        if msg.get("role") == "system" and isinstance(msg.get("content"), str):
            sys_content = msg["content"]
            break

    # Find first user message (if any)
    user_content = ""
    for msg in messages:
        if msg.get("role") == "user" and isinstance(msg.get("content"), str):
            # Strip known boilerplate tags to get the "core" user content
            content = msg["content"]
            for open_tag, close_tag in _BOILERPLATE_PATTERNS.values():
                # Remove all occurrences of this tag pair and their contents
                # (crude but sufficient for fingerprinting)
                pattern = re.escape(open_tag) + r".*?" + re.escape(close_tag)
                content = re.sub(pattern, "", content, flags=re.DOTALL)
            user_content = content.strip()
            break

    combined = f"{sys_content}\n{user_content}".strip()
    if not combined:
        # Fallback: use the full conversation hash (will change each turn,
        # but still isolates turns that are completely boilerplate)
        return _conv_hash(messages)
    return hashlib.sha256(combined.encode()).hexdigest()

def deduplicate_user_message_segments(
    messages: List[dict],
    conv_id: str
) -> Tuple[List[dict], int]:
    """
    For each user message, identify boilerplate segments and omit those
    that are identical to previously-seen segments in this conversation.
    Modifies messages in-place. Returns (messages, segments_removed).
    If a user message becomes empty after removals, it is replaced with
    a minimal placeholder "(no new content)".
    """
    global _segment_fingerprints

    with _segment_fp_lock:
        if conv_id not in _segment_fingerprints:
            _segment_fingerprints[conv_id] = {}
        history = _segment_fingerprints[conv_id]

    segments_removed = 0

    for msg in messages:
        if msg.get("role") != "user":
            continue
        content = msg.get("content", "")
        if not isinstance(content, str):
            continue

        new_content = content
        # Process each boilerplate pattern
        for open_tag, close_tag in _BOILERPLATE_PATTERNS.values():
            # Find all non-overlapping occurrences
            idx = 0
            while True:
                start = new_content.find(open_tag, idx)
                if start == -1:
                    break
                end = new_content.find(close_tag, start + len(open_tag))
                if end == -1:
                    break
                end += len(close_tag)
                segment = new_content[start:end]
                seg_hash = hashlib.sha256(segment.encode()).hexdigest()

                if seg_hash in history:
                    # Duplicate segment: remove it entirely
                    new_content = new_content[:start] + new_content[end:]
                    segments_removed += 1
                    # Continue scanning from the same start position
                    idx = start
                else:
                    # First time seeing this segment: store it
                    history[seg_hash] = (len(history), segment)
                    # Prune if too many entries
                    if len(history) > MAX_FINGERPRINT_HISTORY:
                        oldest = min(history.keys(), key=lambda k: history[k][0])
                        del history[oldest]
                    idx = end

        # After processing, check if the message became empty
        new_content = new_content.strip()
        if new_content == "":
            new_content = "(no new content)"
        msg["content"] = new_content

    return messages, segments_removed

# ==============================================================================
# Conversation summarisation – with token-aware split & retries
# ==============================================================================
def _total_tokens(messages: List[dict]) -> int:
    """Estimate total token count for a list of messages."""
    total = 0
    for m in messages:
        content = m.get("content", "")
        if isinstance(content, str):
            total += estimate_tokens(content)
        for tc in m.get("tool_calls", []):
            total += estimate_tokens(json.dumps(tc.get("function", {}).get("arguments", "")))
    return total

def _summarise_messages_with_retry(summarise_payload: dict, api_key: str) -> str:
    """
    Call DeepSeek summarisation with retries (exponential backoff).
    Raises RuntimeError if all attempts fail.
    """
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }
    last_exc = None
    for attempt in range(1, SUMMARISE_MAX_RETRIES + 1):
        try:
            req = urllib.request.Request(
                f"{DEEPSEEK_BASE}/chat/completions",
                data=json.dumps(summarise_payload).encode(),
                headers=headers,
                method="POST",
            )
            with urllib.request.urlopen(req, timeout=120) as resp:
                body = json.loads(resp.read().decode())
                return body["choices"][0]["message"]["content"]
        except Exception as e:
            last_exc = e
            if attempt < SUMMARISE_MAX_RETRIES:
                sleep_time = SUMMARISE_RETRY_BASE_SLEEP * (2 ** (attempt - 1))
                logging.warning("Summarisation attempt %d failed, retrying in %.1fs: %s",
                                attempt, sleep_time, e)
                time.sleep(sleep_time)
            else:
                logging.error("Summarisation failed after %d attempts", SUMMARISE_MAX_RETRIES)
    raise RuntimeError(f"Summarisation API call failed: {last_exc}")

def _merge_system_messages(messages: List[dict]) -> Tuple[Optional[str], List[dict]]:
    """Extract and merge all system messages into a single string. Returns the merged
    content (or None if none) and the remaining non‑system messages."""
    systems = [m["content"] for m in messages if m["role"] == "system" and isinstance(m.get("content"), str)]
    others = [m for m in messages if m["role"] != "system"]
    merged = "\n".join(systems) if systems else None
    return merged, others

def maybe_summarize(messages: List[dict], api_key: str,
                    max_context: int = MAX_CONTEXT,
                    ratio: float = SUMMARY_RATIO) -> Tuple[List[dict], bool]:
    """
    If total estimated tokens > max_context * ratio, summarise oldest messages
    (except system) and replace them with a condensed summary message.
    Handles multiple system messages by merging them.
    Returns a new message list (does not modify original) and a boolean indicating
    whether summarisation was performed.
    Guarantees at most ONE system message.
    """
    threshold = int(max_context * ratio)
    total = _total_tokens(messages)
    if total <= threshold:
        merged_sys, non_sys = _merge_system_messages(messages)
        if merged_sys is not None:
            return [{"role": "system", "content": merged_sys}] + non_sys, False
        return messages, False

    merged_sys, non_system = _merge_system_messages(messages)
    if not non_system:
        prefix = [{"role": "system", "content": merged_sys}] if merged_sys else []
        return prefix, False

    # Token-aware split: accumulate messages from the start until the token
    # deficit is covered (or at least half the messages are covered, whichever
    # is less). The deficit is total - threshold.
    deficit = total - threshold
    accumulated = 0
    idx = 0
    for i, msg in enumerate(non_system):
        # Add token count of this message
        accumulated += estimate_tokens(msg.get("content", ""))
        for tc in msg.get("tool_calls", []):
            accumulated += estimate_tokens(json.dumps(tc.get("function", {}).get("arguments", "")))
        if accumulated >= deficit and i >= len(non_system) // 2:
            idx = i + 1  # summarise up to and including this message
            break
    else:
        # Not enough tokens? Fallback to half the messages
        idx = max(1, len(non_system) // 2)

    # --- Critical fix: ensure we never split a tool message from its
    #     preceding assistant message with tool_calls ---
    # Walk forward from idx until we are at a safe split point:
    #   - Not in the middle of an assistant(tool_calls) → tool pair
    #   - i.e., the message at idx must NOT be a "tool" role message
    #     whose preceding assistant message had tool_calls
    while idx < len(non_system):
        msg = non_system[idx]
        if msg.get("role") == "tool":
            # This tool message belongs to a preceding assistant; move split past it
            idx += 1
            continue
        if msg.get("role") == "assistant" and msg.get("tool_calls"):
            # Check if the *next* message is a tool response to this assistant
            if idx + 1 < len(non_system) and non_system[idx + 1].get("role") == "tool":
                # Move split past both the assistant and its tool response(s)
                idx += 1
                while idx < len(non_system) and non_system[idx].get("role") == "tool":
                    idx += 1
                continue
        break

    to_summarise = non_system[:idx]
    to_keep = non_system[idx:]

    if not to_summarise:
        prefix = [{"role": "system", "content": merged_sys}] if merged_sys else []
        return prefix + non_system, False

    summary_prompt = (
        "Summarise the following conversation excerpt. "
        "Retain all critical facts, decisions, and code fragments. "
        "Be concise but complete.\n\n"
    )
    summarise_text = ""
    for m in to_summarise:
        role = m["role"]
        content = m.get("content", "")
        if isinstance(content, str):
            summarise_text += f"[{role}]: {content}\n"

    payload = {
        "model": SUMMARY_MODEL,
        "messages": [
            {"role": "user", "content": summary_prompt + summarise_text}
        ],
        "max_tokens": 1000,
        "temperature": 0.0,
        "thinking": {"type": "disabled"},
    }

    try:
        summary = _summarise_messages_with_retry(payload, api_key)
    except RuntimeError as e:
        logging.warning(f"Summarisation failed, falling back to truncation: {e}")
        summary = "[Earlier conversation truncated due to length]"

    new_messages: List[dict] = []
    if merged_sys:
        new_sys_content = merged_sys + "\n\n[Earlier conversation summary]\n" + summary
        new_messages.append({"role": "system", "content": new_sys_content})
    else:
        new_messages.append({"role": "system", "content": f"[Earlier conversation summary]\n{summary}"})
    new_messages.extend(to_keep)
    return new_messages, True

# ==============================================================================
# Reasoning injection helpers
# ==============================================================================
def cache_assistant_message(original_msgs: List[dict], assistant_msg: dict):
    """Cache assistant message (with reasoning) so it can be reused on subsequent turns."""
    if not assistant_msg.get("tool_calls") and not assistant_msg.get("reasoning_content"):
        return
    prefix = [m.copy() for m in original_msgs]
    clean_asst = {k: v for k, v in assistant_msg.items() if k != "reasoning_content"}
    prefix.append(clean_asst)
    _assistant_cache.set(_conv_hash(prefix), assistant_msg)

def inject_reasoning(messages: List[dict]):
    """Look up cached reasoning_content for tool‑call assistant messages and inject it."""
    for i, msg in enumerate(messages):
        if msg.get("role") != "assistant":
            continue
        if not msg.get("tool_calls"):
            continue
        if "reasoning_content" in msg:
            continue
        prefix = messages[:i+1]
        cached = _assistant_cache.get(_conv_hash(prefix))
        if cached and "reasoning_content" in cached:
            msg["reasoning_content"] = cached["reasoning_content"]

def should_disable_thinking(messages: List[dict]) -> bool:
    """Return True if thinking should be disabled for the current conversation
    (i.e. there is a tool‑call assistant message WITHOUT reasoning, meaning the
    model doesn’t need to produce new reasoning)."""
    return any(
        m.get("role") == "assistant" and m.get("tool_calls") and "reasoning_content" not in m
        for m in messages
    )

# ==============================================================================
# DeepSeek API helpers
# ==============================================================================
def _make_deepseek_request(payload: dict, stream: bool) -> urllib.request.Request:
    headers = {
        "Authorization": f"Bearer {DSEEK_KEY}",
        "Content-Type": "application/json",
        "Accept": "text/event-stream" if stream else "application/json",
    }
    return urllib.request.Request(
        f"{DEEPSEEK_BASE}/chat/completions",
        data=json.dumps(payload).encode(),
        headers=headers,
        method="POST",
    )

def deepseek_nonstream(payload: dict) -> dict:
    """Perform a non‑streaming request. Raises RuntimeError on HTTP errors."""
    req = _make_deepseek_request(payload, stream=False)
    try:
        with urllib.request.urlopen(req, timeout=600) as resp:
            return json.loads(resp.read().decode())
    except urllib.error.HTTPError as e:
        error_body = e.read().decode() if e.fp else ""
        raise RuntimeError(f"DeepSeek HTTP {e.code}: {error_body}") from e

# ==============================================================================
# Streaming buffer for accumulating tool calls
# ==============================================================================
class StreamBuffer:
    def __init__(self):
        self.reasoning = ""
        self.content = ""
        self.tool_calls: Dict[int, dict] = {}
        self.finish_reason: Optional[str] = None
        self.usage: Optional[dict] = None

    def process_chunk(self, chunk: dict) -> None:
        for choice in chunk.get("choices", []):
            delta = choice.get("delta", {})
            
            if "reasoning_content" in delta:
                rc = delta["reasoning_content"]
                self.reasoning = "" if rc is None else self.reasoning + rc
            
            if "content" in delta:
                ct = delta["content"]
                self.content = "" if ct is None else self.content + ct
            
            for tc in delta.get("tool_calls", []):
                idx = tc.get("index")
                if idx is None:
                    continue
                if idx not in self.tool_calls:
                    self.tool_calls[idx] = {
                        "id": tc.get("id", ""),
                        "type": tc.get("type", "function"),
                        "function": {
                            "name": "",
                            "arguments": "",
                        },
                    }
                cur = self.tool_calls[idx]
                tid = tc.get("id")
                if tid is not None:
                    cur["id"] = tid
                ttype = tc.get("type")
                if ttype is not None:
                    cur["type"] = ttype
                func_raw = tc.get("function")
                func = func_raw if isinstance(func_raw, dict) else {}
                name = func.get("name")
                if name is not None:
                    cur["function"]["name"] = name
                args = func.get("arguments")
                if args is not None:
                    cur["function"]["arguments"] += args
            
            if "message" in choice:
                msg = choice["message"]
                rc_msg = msg.get("reasoning_content")
                ct_msg = msg.get("content")
                if rc_msg is not None:
                    self.reasoning = rc_msg
                if ct_msg is not None:
                    self.content = ct_msg
            
            finish = choice.get("finish_reason")
            if finish is not None:
                self.finish_reason = finish
        
        if "usage" in chunk:
            self.usage = chunk["usage"]

    def build_assistant_message(self) -> dict:
        msg: Dict[str, Any] = {"role": "assistant"}
        if self.reasoning:
            msg["reasoning_content"] = self.reasoning
        if self.content:
            msg["content"] = self.content
        if self.tool_calls:
            msg["tool_calls"] = [self.tool_calls[k] for k in sorted(self.tool_calls)]
        return msg

# ==============================================================================
# HTTP Request Handler
# ==============================================================================
class ProxyHandler(http.server.BaseHTTPRequestHandler):
    def log_message(self, format, *args):
        logging.info("%s - %s", self.client_address[0], format % args)

    def do_POST(self):
        if self.path != "/v1/chat/completions":
            self.send_error(404)
            return

        content_length = int(self.headers.get("Content-Length", 0))
        if not content_length:
            self.send_error(400, "Empty body")
            return
        body = self.rfile.read(content_length)
        try:
            client_req = json.loads(body)
        except json.JSONDecodeError:
            self.send_error(400, "Invalid JSON")
            return

        messages = client_req.get("messages", [])
        # --- Early validation: messages must be a non-empty list ---
        if not isinstance(messages, list) or len(messages) == 0:
            self.send_error(400, "Empty or invalid 'messages' array")
            return

        stream = client_req.get("stream", False)

        # Deep copy before any in‑place mutations
        original_messages = copy.deepcopy(messages)

        # ---------- Pre-processing metrics ----------
        original_msg_count = len(original_messages)
        original_tokens = _total_tokens(original_messages)
        msg_hash = _md5_of_messages(original_messages)

        # Save original message for developer introspection
        if SAVE_PREPOST_MSGS:
            _save_json_message(PRE_MSG_DIR, "pre", original_messages, msg_hash)

        # ---- Pipeline ----
        # 1. Markdown block deduplication
        replaced_blocks = deduplicate_markdown_blocks(messages)

        # 2. Inter-message content fingerprinting & deduplication (Feature F-1)
        conv_id = _get_conversation_base_id(original_messages)
        messages, deduped_segments = deduplicate_user_message_segments(messages, conv_id)

        # 3. Conversation summarisation (token-aware split)
        messages, summarized = maybe_summarize(
            messages,
            api_key=DSEEK_KEY,
            max_context=MAX_CONTEXT,
            ratio=SUMMARY_RATIO,
        )

        # 4. System prompt compression – auto-summarize & cache
        compressed_prompts_found = 0
        for msg in messages:
            if msg["role"] == "system" and isinstance(msg.get("content"), str):
                sys_content = msg["content"]
                h = _stable_hash({"content": sys_content})
                # Save original if not already saved
                if SAVE_PREPOST_SYSTEM:
                    save_original_prompt(h, sys_content)
                # Check for cached summarized version
                summarized_sys = load_summarized_prompt(h)
                if summarized_sys:
                    logging.debug("Using summarized system prompt for hash %s", h[:12])
                    msg["content"] = summarized_sys
                    compressed_prompts_found += 1
                else:
                    # Generate summary via DeepSeek API
                    try:
                        logging.info("Generating summarized system prompt for hash %s", h[:12])
                        summarized_sys = summarize_system_prompt(sys_content, DSEEK_KEY)
                        if SAVE_PREPOST_SYSTEM:
                            save_summarized_prompt(h, summarized_sys)
                        msg["content"] = summarized_sys
                        compressed_prompts_found += 1
                    except RuntimeError as e:
                        logging.warning("Failed to summarize system prompt, using original: %s", e)

        # 5. Reasoning injection
        inject_reasoning(messages)

        # 6. Build final payload
        payload = dict(client_req)
        payload["messages"] = messages
        payload["model"] = DEFAULT_MODEL
        payload["max_tokens"] = MAX_OUTPUT_TOKENS

        if THINKING_MODE == "enabled":
            payload["thinking"] = {"type": "enabled"}
        elif THINKING_MODE == "disabled":
            payload["thinking"] = {"type": "disabled"}
        else:
            payload["thinking"] = {
                "type": "disabled" if should_disable_thinking(messages) else "enabled"
            }

        payload["stream"] = stream

        # ---------- Post-processing metrics ----------
        final_msg_count = len(messages)
        final_tokens = _total_tokens(messages)
        compression_pct = (1 - final_tokens / original_tokens) * 100 if original_tokens > 0 else 0.0

        if SAVE_PREPOST_MSGS:
            _save_json_message(POST_MSG_DIR, "post", messages, msg_hash)

        logging.info(
            f"REQ {msg_hash} | msgs: {original_msg_count} → {final_msg_count} "
            f"| tokens: {original_tokens} → {final_tokens} ({compression_pct:+.1f}%) "
            f"| blocks dedup'd: {replaced_blocks} "
            f"| dedup'd segments: {deduped_segments} "
            f"| summarized: {'yes' if summarized else 'no'} "
            f"| compressed prompts: {compressed_prompts_found} "
            f"| stream: {stream} "
            f"| thinking: {payload['thinking']['type']}"
        )

        # 7. Dispatch
        try:
            if stream:
                self._handle_stream(payload, original_messages)
            else:
                self._handle_nonstream(payload, original_messages)
        except RuntimeError as e:
            logging.error("Upstream error: %s", e)
            self.send_error(502, f"Upstream error: {e}")
        except Exception:
            logging.exception("Unexpected proxy error")
            try:
                self.send_error(500, "Internal proxy error")
            except Exception:
                pass

    def _handle_nonstream(self, payload, original_msgs):
        resp = deepseek_nonstream(payload)

        choices = resp.get("choices")
        if not choices or len(choices) == 0:
            logging.error("DeepSeek returned empty choices array")
            self.send_error(502, "Empty response from upstream")
            return

        choice = choices[0]
        assistant_msg = choice.get("message", {}).copy()

        cache_assistant_message(original_msgs, assistant_msg)

        if "reasoning_content" in assistant_msg:
            del assistant_msg["reasoning_content"]
        choice["message"] = assistant_msg

        body = json.dumps(resp).encode()
        try:
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)
        except (BrokenPipeError, ConnectionResetError, OSError) as e:
            logging.warning("Client disconnected while sending non‑stream response: %s", e)

    def _handle_stream(self, payload, original_msgs):
        req = _make_deepseek_request(payload, stream=True)
        try:
            with urllib.request.urlopen(req, timeout=600) as upstream:
                self.send_response(200)
                self.send_header("Content-Type", "text/event-stream")
                self.send_header("Cache-Control", "no-cache")
                self.end_headers()

                buffer = StreamBuffer()
                done = False
                try:
                    for line in upstream:
                        line_str = line.decode() if isinstance(line, bytes) else line
                        if line_str.startswith("data:"):
                            data_part = line_str[5:].strip()
                            if data_part == "[DONE]":
                                self.wfile.write(b"data: [DONE]\n\n")
                                self.wfile.flush()
                                done = True
                                break
                            try:
                                chunk = json.loads(data_part)
                            except json.JSONDecodeError:
                                self.wfile.write(line_str.encode() + b"\n")
                                self.wfile.flush()
                                continue

                            buffer.process_chunk(chunk)

                            for choice in chunk.get("choices", []):
                                if "delta" in choice:
                                    choice["delta"].pop("reasoning_content", None)
                                if "message" in choice:
                                    choice["message"].pop("reasoning_content", None)

                            self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode())
                            self.wfile.flush()
                        else:
                            self.wfile.write(line_str.encode() + b"\n")
                            self.wfile.flush()
                except (BrokenPipeError, ConnectionResetError, OSError) as e:
                    logging.warning("Client disconnected during stream: %s", e)
                    done = True
                except Exception:
                    logging.exception("Unexpected error while streaming response")
                    done = True

                if not done:
                    try:
                        self.wfile.write(b"data: [DONE]\n\n")
                        self.wfile.flush()
                    except (BrokenPipeError, ConnectionResetError, OSError):
                        pass

                assistant_msg = buffer.build_assistant_message()
                if assistant_msg.get("tool_calls") or assistant_msg.get("reasoning_content"):
                    cache_assistant_message(original_msgs, assistant_msg)

        except (urllib.error.HTTPError, urllib.error.URLError) as e:
            logging.error("Upstream connection error: %s", e)
            error_detail = ""
            if isinstance(e, urllib.error.HTTPError):
                try:
                    error_detail = e.read().decode()
                except Exception:
                    pass
            self.send_error(502, f"Upstream error: {error_detail}")

# ==============================================================================
# Main – with fixed Ctrl+C handling (avoid deadlock) and dev directories
# ==============================================================================
def main():
    global MAX_CONTEXT, SUMMARY_RATIO, MAX_OUTPUT_TOKENS, THINKING_MODE

    parser = argparse.ArgumentParser(description="DeepSeek V4 Flash OpenAI Proxy (globals forced)")
    parser.add_argument("--port", type=int, default=8080, help="Listening port")
    parser.add_argument("--host", default="0.0.0.0", help="Bind address")
    parser.add_argument("--max-context", type=int, default=128000, help="Max context tokens")
    parser.add_argument("--summarize-ratio", type=float, default=0.8,
                        help="Trigger summarisation when tokens exceed ratio * max-context")
    parser.add_argument("--disable-compression", action="store_true",
                        help="Do not auto-summarize system prompts (use originals as-is)")
    parser.add_argument("--max-output-tokens", type=int, default=128000,
                        help="Force this many max tokens for generation (overrides client)")
    parser.add_argument("--thinking", choices=["enabled", "disabled", "auto"], default="auto",
                        help="Force thinking mode (default: auto)")
    args = parser.parse_args()

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s"
    )

    if not args.disable_compression and SAVE_PREPOST_SYSTEM:
        _ensure_sys_dirs()

    MAX_CONTEXT = args.max_context
    SUMMARY_RATIO = args.summarize_ratio
    MAX_OUTPUT_TOKENS = args.max_output_tokens
    THINKING_MODE = args.thinking

    # Ensure developer dump directories exist
    if SAVE_PREPOST_MSGS:
        PRE_MSG_DIR.mkdir(parents=True, exist_ok=True)
        POST_MSG_DIR.mkdir(parents=True, exist_ok=True)

    server = socketserver.ThreadingTCPServer((args.host, args.port), ProxyHandler)
    server.daemon_threads = True

    # ---- Graceful shutdown WITHOUT deadlocking the main thread ----
    shutdown_lock = threading.Lock()
    shutting_down = False

    def _shutdown(signum, frame):
        nonlocal shutting_down
        with shutdown_lock:
            if shutting_down:
                return
            shutting_down = True
        logging.info("Received signal %s, shutting down.", signum)
        threading.Thread(target=server.shutdown, daemon=True).start()

    signal.signal(signal.SIGTERM, _shutdown)
    signal.signal(signal.SIGINT, _shutdown)

    logging.info(f"Proxy listening on {args.host}:{args.port}")
    logging.info(f"Forced model: {DEFAULT_MODEL}, max_tokens: {MAX_OUTPUT_TOKENS}, thinking: {THINKING_MODE}")
    logging.info(f"Pre/post message dumps: {PRE_MSG_DIR} / {POST_MSG_DIR}")

    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass
    finally:
        server.server_close()
        logging.info("Server stopped.")

if __name__ == "__main__":
    main()
