It was 2 a.m. when the alert call jolted me awake — our production Agent had suffered “amnesia” for three consecutive conversations. The context the user had carefully built was gone, and complaints were flooding in. Squinting at the logs, I discovered that the rollback
method in the memory management module had been broken by an innocuous-looking code refactor. Not only did the rollback undo the erroneous operation, it also wiped out the entire conversation history. Worse still, our existing unit tests never caught the bug: they always started from a fresh empty database and could never cover a cross-session scenario like “roll back dirty data to a previous snapshot.” I spent three hours debugging, manually simulating intermediate states, before I finally pinpointed the root cause. That’s when it hit me: we weren't lacking tests — we were missing snapshot tests that capture the entire “memory state.”
Our LLM memory system uses SQLite for local persistence. Each session owns a table that stores conversation turns, vector summaries, and tool-call records. Two critical operations are:
save_snapshot(session_id)
: serializes the full state of a session into the snapshots
table, creating a rollback checkpoint.rollback_to_snapshot(session_id, snapshot_id)
: when something goes wrong, it rebuilds the session table from a snapshot and discards all changes made after that point.This mechanism had been running smoothly — until a refactor I made changed the transaction boundaries inside the rollback logic. After the rollback executed, the conversations
table was rebuilt just fine, but the snapshots
table itself was accidentally wiped out. The next rollback attempt couldn’t find any previous checkpoints.
Why didn’t traditional unit tests catch this? Because the typical test flow looks like this:
def test_rollback():
db = create_in_memory_db()
db.save_snapshot("s1")
db.rollback_to_snapshot("s1", ...)
assert db.get_conversation("s1") == expected
Everything runs in a single process, inside a single temporary database. However, the production scenario was different: process A saves a snapshot and exits, then process B reopens the same database file and performs the rollback. File-level persistent state, WAL log merging, and even the visibility of the snapshots
table across different connections — none of that was tested. To put it bluntly, we tested the “logic” but never tested the “storage.”
I decided to bring in snapshot testing, but instead of using text-based snapshots, I would treat the SQLite database file itself as an immutable artifact.
Comparison of approaches:
tmp_path
- manual comparisonThe architectural idea: provide a
snapshot_db
fixture via conftest.py
that:
tests/snapshots/memory_test.sqlite
) exists before the test starts.--snapshot-update
flag) and the test passes immediately.With this approach, our tests truly simulate a “cross-process, cross-connection” persistence effect — each test case receives an independent copy of a database file, performs its operations, and then the entire file state is compared against the expected outcome.
This code clarifies what we intend to test. MemoryManager
wraps the SQLite connection, snapshot saving, and rollback — a simplified version of what we use in production.
import sqlite3
import uuid
from datetime import datetime, timezone
class MemoryManager:
def __init__(self, db_path: str):
self.db_path = db_path
self._init_tables()
def _get_conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
return conn
def _init_tables(self):
with self._get_conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS conversations (
session_id TEXT NOT NULL,
turn INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
PRIMARY KEY (session_id, turn)
);
CREATE TABLE IF NOT EXISTS snapshots (
snapshot_id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
created_at TEXT NOT NULL,
state_json TEXT NOT NULL
);
""")
def add_message(self, session_id: str, role: str, content: str):
with self._get_conn() as conn:
turn = conn.execute(
"SELECT COALESCE(MAX(turn), 0) + 1 FROM conversations WHERE session_id = ?",