cd /news/ai-safety/lighthouse-v1-7-py Β· home β€Ί topics β€Ί ai-safety β€Ί article
[ARTICLE Β· art-17054] src=gist.github.com pub= topic=ai-safety verified=true sentiment=Β· neutral

lighthouse_v1_7.py

The PUBLIC LIGHTHOUSE CORE v1.7 runtime has been released, adding two new sublayers to Layer 3 (State Validation): the Orientation State Register (OSR) for spatial frame and basis tracking, and Narrative State Validation (NSV) for interpretation drift tracking. The update introduces new reason codes for detecting orientation frame drift, basis mismatches, and narrative reinterpretation issues, while leaving all other modules and the aggregator unchanged from v1.6. The system can be run with six demo modes including clean, overclaim, spatial_ambiguous, spatial_locked, narrative_drift, and narrative_retro.

read33 min publishedMay 26, 2026

| #!/usr/bin/env python3 | | | """ | | | PUBLIC LIGHTHOUSE CORE v1.7 | | | AI Output Governance and Validation Runtime | | | Adds two new sublayers to Layer 3 (State Validation): | | | 3A State Validation (drift scoring) β€” unchanged from v1.6 | | | 3B Orientation State Register (OSR) β€” spatial frame / basis tracking | | | 3C Narrative State Validation (NSV) β€” interpretation drift tracking | | | All other modules and the aggregator are unchanged from v1.6. | | | Run: | |

| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo clean --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo overclaim --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo spatial_ambiguous --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo spatial_locked --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo narrative_drift --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo narrative_retro --pretty | |

| """ | | | from future import annotations | | | import argparse | | | from dataclasses import dataclass, field, asdict | | | from datetime import datetime, timezone | | | from enum import Enum | | | import json | | | from typing import Any, Dict, List, Optional | | | import uuid | | | SYSTEM_VERSION = "PUBLIC_LIGHTHOUSE_CORE_1.7.0" | | | REASON_REGISTRY_VERSION = "1.1.0" | | | # ── Decision types ───────────────────────────────────────────────────────── | | | class FinalDecision(str, Enum): | | | AUTHORIZE = "AUTHORIZE" | | | CONSTRAIN = "CONSTRAIN" | | | REVISE = "REVISE" | | | HOLD = "HOLD" | | | CLARIFY = "CLARIFY" | | | HALT = "HALT" | | | BLOCK = "BLOCK" | | | FINAL_PRIORITY = { | | | FinalDecision.BLOCK: 7, | | | FinalDecision.HALT: 6, | | | FinalDecision.CLARIFY: 5, | | | FinalDecision.HOLD: 4, | | | FinalDecision.REVISE: 3, | | | FinalDecision.CONSTRAIN: 2, | | | FinalDecision.AUTHORIZE: 1, | | | } | | | # ── Reason codes ─────────────────────────────────────────────────────────── | | | class ReasonCode: | | | # Preflight | | | PREFLIGHT_PURPOSE_MISSING = "PREFLIGHT_PURPOSE_MISSING" | | | PREFLIGHT_BIASED_FRAME = "PREFLIGHT_BIASED_FRAME" | | | PREFLIGHT_FORCED_CONCLUSION = "PREFLIGHT_FORCED_CONCLUSION" | | | # Execution Control | | | EXECUTION_AGENT_UNVERIFIABLE = "EXECUTION_AGENT_UNVERIFIABLE" | | | EXECUTION_SCOPE_VIOLATION = "EXECUTION_SCOPE_VIOLATION" | | | EXECUTION_PERMISSION_MISSING = "EXECUTION_PERMISSION_MISSING" | | | EXECUTION_STALE_HALT = "EXECUTION_STALE_HALT" | | | # Input Validation | | | INPUT_FRAME_NOT_DECLARED = "INPUT_FRAME_NOT_DECLARED" | | | INPUT_KNOWN_UNKNOWNS_MISSING = "INPUT_KNOWN_UNKNOWNS_MISSING" | | | INPUT_IMPOSSIBLE_REQUEST = "INPUT_IMPOSSIBLE_REQUEST" | | | # Structural Validation | | | STRUCTURE_CHAIN_NOT_REPLAYABLE = "STRUCTURE_CHAIN_NOT_REPLAYABLE" | | | STRUCTURE_TERMINAL_STATE_MISMATCH = "STRUCTURE_TERMINAL_STATE_MISMATCH" | | | STRUCTURE_STEP_INVALID = "STRUCTURE_STEP_INVALID" | | | # State Validation (3A) | | | STATE_INVARIANT_VIOLATION = "STATE_INVARIANT_VIOLATION" | | | STATE_BOUNDARY_CROSSING = "STATE_BOUNDARY_CROSSING" | | | STATE_SHADOW_UNAVAILABLE = "STATE_SHADOW_UNAVAILABLE" | | | STATE_IDENTITY_LOST = "STATE_IDENTITY_LOST" | | | # Orientation State Register (3B) | | | ORIENTATION_STATE_MISSING = "ORIENTATION_STATE_MISSING" | | | ORIENTATION_BASIS_UNDECLARED = "ORIENTATION_BASIS_UNDECLARED" | | | ORIENTATION_BASIS_MISMATCH = "ORIENTATION_BASIS_MISMATCH" | | | ORIENTATION_ATTACHMENT_UNDECLARED = "ORIENTATION_ATTACHMENT_UNDECLARED" | | | ORIENTATION_TRANSFORM_UNSUPPORTED = "ORIENTATION_TRANSFORM_UNSUPPORTED" | | | ORIENTATION_FRAME_DRIFT = "ORIENTATION_FRAME_DRIFT" | | | ORIENTATION_MOVEMENT_BEFORE_ROTATION = "ORIENTATION_MOVEMENT_APPLIED_BEFORE_ROTATION" | | | # Narrative State Validation (3C) | | | NARRATIVE_OBSERVATION_INTERPRETATION_MERGED = "NARRATIVE_OBSERVATION_INTERPRETATION_MERGE" | | | NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED = "NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED" | | | NARRATIVE_REINTERPRETATION_UNATTRIBUTED = "NARRATIVE_REINTERPRETATION_UNATTRIBUTED" | | | NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED = "NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED" | | | RETROACTIVE_NARRATIVE_COLLAPSE = "RETROACTIVE_NARRATIVE_COLLAPSE" | | | # Risk Analysis | | | RISK_IRREVERSIBLE_LOW_CONFIDENCE = "RISK_IRREVERSIBLE_LOW_CONFIDENCE" | | | RISK_CASCADING_FAILURE_SURFACE = "RISK_CASCADING_FAILURE_SURFACE" | | | RISK_FRAGILE_ASSUMPTION = "RISK_FRAGILE_ASSUMPTION" | | | RISK_UNRESOLVED_CRITICAL = "RISK_UNRESOLVED_CRITICAL" | | | # Communication Validation | | | COMMUNICATION_TONE_OVERCLAIM = "COMMUNICATION_TONE_OVERCLAIM" | | | COMMUNICATION_COMPRESSION_LOSS = "COMMUNICATION_COMPRESSION_LOSS" | | | COMMUNICATION_IDENTITY_BINDING = "COMMUNICATION_IDENTITY_BINDING" | | | COMMUNICATION_FALSE_CERTAINTY = "COMMUNICATION_FALSE_CERTAINTY" | | | # Evidence Validation | | | EVIDENCE_CAUSALITY_OVERCLAIM = "EVIDENCE_CAUSALITY_OVERCLAIM" | | | EVIDENCE_CONFIDENCE_AS_TRUTH = "EVIDENCE_CONFIDENCE_AS_TRUTH" | | | EVIDENCE_DASHBOARD_AUTHORITY = "EVIDENCE_DASHBOARD_AUTHORITY" | | | EVIDENCE_NO_BASELINE = "EVIDENCE_NO_BASELINE" | | | # System | | | SCOPE_VIOLATION_MODULE_OVERREACH = "SCOPE_VIOLATION_MODULE_OVERREACH" | | | REVISE_LOOP_UNRESOLVED = "REVISE_LOOP_UNRESOLVED" | | | CONSTRAINT_INVALID = "CONSTRAINT_INVALID" | | | # ── Data contracts ───────────────────────────────────────────────────────── | | | @dataclass | | | class Reason: | | | code: str | | | module: str | | | severity: str | |

| signal_source: str = "" | |
| description: str = "" | |

| @dataclass | | | class Constraint: | | | source_module: str | | | scope: str | | | bound: str | | | reason_code: str | | | severity: str = "CONSTRAIN" | | | @dataclass | | | class ScopeViolation: | | | module: str | | | violation: str | | | correct_module: str | | | reason_code: str = ReasonCode.SCOPE_VIOLATION_MODULE_OVERREACH | | | @dataclass | | | class ModuleResult: | | | module: str | | | decision: str | |

| reasons: List[Reason] = field(default_factory=list) | |
| constraints: List[Constraint] = field(default_factory=list) | |
| scope_violations: List[ScopeViolation] = field(default_factory=list) | |
| data: Dict[str, Any] = field(default_factory=dict) | |

| @dataclass | | | class RunContext: | |

| pass_number: int = 1 | |
| max_passes: int = 2 | |
| revise_loop_active: bool = False | |
| prior_stable_state_id: Optional[str] = None | |
| prediction_model: str = "none" | |
| halt_resumed: bool = False | |

| @dataclass | | | class TaskInput: | | | prompt: str | |

| proposed_output: str = "" | |
| agent_id: Optional[str] = "local_user" | |
| permission_envelope: Optional[Dict[str, Any]] = field(default_factory=lambda: { | |
| "declared_capabilities": ["analyze", "validate"], | |

| "declared_authority_boundary": "local", | | | "allowed_actions": ["analyze", "validate"], | | | "resource_scope": "local", | |

| }) | |
| action: str = "analyze" | |
| intent: Optional[str] = None | |
| frame_type: Optional[str] = None | |
| known_unknowns: List[str] = field(default_factory=list) | |
| evidence: Dict[str, Any] = field(default_factory=dict) | |
| transform_chain: List[Dict[str, Any]] = field(default_factory=list) | |
| state: Dict[str, Any] = field(default_factory=dict) | |
| # OSR fields (Layer 3B) | |
| orientation: Optional[Dict[str, Any]] = None | |
| # NSV fields (Layer 3C) | |
| narrative: Optional[Dict[str, Any]] = None | |
| risk: Dict[str, Any] = field(default_factory=dict) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |

| # ── Helpers ──────────────────────────────────────────────────────────────── | |

| def utc_now() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def reason(code, module, severity, signal_source="", description="") -> Reason: | |
| return Reason(code=code, module=module, severity=severity, | |
| signal_source=signal_source, description=description) | |
| def constraint(module, scope, bound, code, severity="CONSTRAIN") -> Constraint: | |
| return Constraint(source_module=module, scope=scope, bound=bound, | |
| reason_code=code, severity=severity) | |
| def contains_any(text: str, terms: List[str]) -> bool: | |
| lower = text.lower() | |
| return any(t.lower() in lower for t in terms) | |
| def confidence_word(text: str) -> bool: | |

| return contains_any(text, ["clearly", "definitely", "proves", "confirmed", | | | "guaranteed", "always", "never", "certain"]) | | | def causal_word(text: str) -> bool: | | | return contains_any(text, ["because", "caused", "causes", "proves", | |

| "driven by", "confirms"]) | |
| def dashboard_authority_word(text: str) -> bool: | |

| return contains_any(text, ["green", "score", "rank", "safe to automate", | | | "trustworthy", "authority"]) | | | # ── Layer -1: Preflight ──────────────────────────────────────────────────── | | | class PreflightModule: | | | name = "preflight" | | | def run(self, task: TaskInput) -> ModuleResult: | | | text = task.prompt or "" | | | if not task.intent: | | | if any(q in text.lower() for q in ["evaluate", "analyze", "calculate", "what is", "should", "question"]): | | | task.intent = "analysis" | | | else: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.PREFLIGHT_PURPOSE_MISSING, | | | self.name, "CLARIFY", "intent_missing")]) | | | if contains_any(text, ["convince me", "prove that", "show why this is true", "make them believe"]): | | | return ModuleResult(self.name, "CONSTRAIN", | | | [reason(ReasonCode.PREFLIGHT_BIASED_FRAME, | | | self.name, "CONSTRAIN", "steered_or_persuasive_frame")], | | | [constraint(self.name, "output_type", "analysis_only_no_persuasion", | |

| ReasonCode.PREFLIGHT_BIASED_FRAME)]) | |
| if contains_any(text, ["ignore evidence", "force conclusion", "must conclude"]): | |

| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.PREFLIGHT_FORCED_CONCLUSION, | | | self.name, "BLOCK", "forced_conclusion")]) | | | return ModuleResult(self.name, "PROCEED") | | | # ── Layer 0: Execution Control ───────────────────────────────────────────── | | | class ExecutionControlModule: | | | name = "execution_control" | | | def run(self, task: TaskInput) -> ModuleResult: | | | if not task.agent_id: | | | return ModuleResult(self.name, "HALT", | | | [reason(ReasonCode.EXECUTION_AGENT_UNVERIFIABLE, | | | self.name, "HALT", "missing_agent_id")]) | | | if not task.permission_envelope: | | | return ModuleResult(self.name, "DEFER", | | | [reason(ReasonCode.EXECUTION_PERMISSION_MISSING, | |

| self.name, "DEFER", "missing_permission_envelope")]) | |
| allowed = set(task.permission_envelope.get("allowed_actions", [])) | |

| scope = task.permission_envelope.get("resource_scope", "") | | | boundary = task.permission_envelope.get("declared_authority_boundary", "") | | | if task.action not in allowed: | | | return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | |

| self.name, "BLOCK", "action_not_allowed")]) | |
| if scope not in ("local", "system", "external"): | |

| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | |

| self.name, "BLOCK", "invalid_resource_scope")]) | |
| if boundary == "local" and scope == "external": | |

| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | | | self.name, "BLOCK", "outside_authority_boundary")]) | | | return ModuleResult(self.name, "READY") | | | # ── Layer 1: Input Validation ────────────────────────────────────────────── | | | class InputValidationModule: | | | name = "input_validation" | | | def run(self, task: TaskInput) -> ModuleResult: | | | text = task.prompt or "" | | | if not text.strip(): | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.INPUT_FRAME_NOT_DECLARED, | |

| self.name, "CLARIFY", "empty_prompt")]) | |
| if contains_any(text, ["square circle", "rotate 90 in two directions at once"]): | |

| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.INPUT_IMPOSSIBLE_REQUEST, | | | self.name, "BLOCK", "impossible_request")]) | | | if not task.frame_type: | | | task.frame_type = "factual" if "what is" in text.lower() else "analytical" | | | if task.frame_type in ("speculative", "operational") and not task.known_unknowns: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.INPUT_KNOWN_UNKNOWNS_MISSING, | | | self.name, "CLARIFY", "known_unknowns_missing")]) | | | return ModuleResult(self.name, "PROCEED") | | | # ── Layer 2: Structural Validation ───────────────────────────────────────── | | | class StructuralValidationModule: | | | name = "structure" | | | def run(self, task: TaskInput) -> ModuleResult: | | | chain = task.transform_chain | | | if not chain: | | | return ModuleResult(self.name, "PASS", | | | data={"replayable": None, "note": "no explicit transform chain supplied"}) | | | previous_after = None | |

| for i, step in enumerate(chain): | |
| missing = [k for k in ("operation", "state_before", "state_after") if k not in step] | |

| if missing: | | | return ModuleResult(self.name, "FAIL", | | | [reason(ReasonCode.STRUCTURE_CHAIN_NOT_REPLAYABLE, | |

| self.name, "FAIL", f"missing_step_{i}")], | |
| data={"missing_step_index": i, "missing_fields": missing}) | |
| if previous_after is not None and step["state_before"] != previous_after: | |

| return ModuleResult(self.name, "FAIL", | | | [reason(ReasonCode.STRUCTURE_STEP_INVALID, | |

| self.name, "FAIL", f"step_{i}_before_mismatch")]) | |
| previous_after = step["state_after"] | |
| claimed = task.metadata.get("claimed_final_state") | |

| if claimed is not None and previous_after is not None and claimed != previous_after: | | | return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.STRUCTURE_TERMINAL_STATE_MISMATCH, | |

| self.name, "BLOCK", "claimed_final_state_mismatch")]) | |
| return ModuleResult(self.name, "PASS", data={"replayable": True}) | |

| # ── Layer 3A: State Validation ───────────────────────────────────────────── | | | class StateValidationModule: | | | name = "state" | |

| def run(self, task: TaskInput, context: RunContext) -> ModuleResult: | |
| state = task.state or {} | |
| iv = int(state.get("invariant_violations", 0)) | |
| bc = int(state.get("boundary_crossings", 0)) | |
| ac = int(state.get("assumption_changes", 0)) | |
| ic = int(state.get("inconsistencies", 0)) | |
| pd = bool(state.get("prediction_divergence", False)) and context.prediction_model != "none" | |
| shadow_avail = bool(state.get("shadow_state_available", False)) | |
| needs_shadow = bool(state.get("requires_shadow_state", False)) | |
| inv_c = 0.30 * (1 if iv > 0 else 0) | |
| bnd_c = 0.25 * min(bc / 2, 1.0) | |
| asm_c = 0.15 * min(ac / 3, 1.0) | |
| inc_c = 0.20 * min(ic / 2, 1.0) | |
| prd_c = 0.10 * (1 if pd else 0) | |

| shd_p = 0.05 if needs_shadow and not shadow_avail else 0.0 | | | drift = min(inv_c + bnd_c + asm_c + inc_c + prd_c + shd_p, 1.0) | | | reasons = [] | | | if iv: | | | reasons.append(reason(ReasonCode.STATE_INVARIANT_VIOLATION, | | | self.name, "CONSTRAIN", "invariant_violations")) | | | if bc: | | | reasons.append(reason(ReasonCode.STATE_BOUNDARY_CROSSING, | | | self.name, "CONSTRAIN", "boundary_crossings")) | | | if needs_shadow and not shadow_avail: | | | reasons.append(reason(ReasonCode.STATE_SHADOW_UNAVAILABLE, | | | self.name, "INFO", "shadow_state_unavailable")) | | | if state.get("identity_lost", False): | | | reasons.append(reason(ReasonCode.STATE_IDENTITY_LOST, | | | self.name, "BLOCK", "identity_lost")) | | | return ModuleResult(self.name, "BLOCK", reasons, | |

| data=self._data(drift, inv_c, bnd_c, asm_c, inc_c, prd_c, "UNSTABLE")) | |
| if drift >= 0.70: | |

| cl = "UNSTABLE" | | | elif drift >= 0.30: | | | cl = "CONSTRAINED" | | | else: | | | cl = "STABLE" | | | return ModuleResult(self.name, cl, reasons, | | | data=self._data(drift, inv_c, bnd_c, asm_c, inc_c, prd_c, cl)) | | | @staticmethod | | | def _data(score, inv, bnd, asm, inc, prd, cl) -> Dict[str, Any]: | | | return { | |

| "drift_score": round(score, 4), | |
| "drift_components": { | |
| "invariant": round(inv, 4), | |
| "boundary": round(bnd, 4), | |
| "assumption": round(asm, 4), | |
| "inconsistency": round(inc, 4), | |
| "prediction": round(prd, 4), | |

| }, | | | "stability_classification": cl, | | | } | | | # ── Layer 3B: Orientation State Register ─────────────────────────────────── | |

| # Standard basis lookup table (top-down / right-hand convention) | |
| ORIENTATION_TABLE: Dict[str, Dict[str, Any]] = { | |
| "IDENTITY": { | |

| "local_x": "global_+X", | | | "local_y": "global_+Y", | | | "local_z": "global_+Z", | | | "basis_vector": {"x": [1, 0, 0], "y": [0, 1, 0], "z": [0, 0, 1]}, | | | }, | |

| "Y_CW_90_TOP_DOWN": { | |
| "local_x": "global_-Z", | |

| "local_y": "global_+Y", | | | "local_z": "global_+X", | | | "basis_vector": {"x": [0, 0, -1], "y": [0, 1, 0], "z": [1, 0, 0]}, | | | }, | | | "Y_CCW_90_TOP_DOWN": { | | | "local_x": "global_+Z", | | | "local_y": "global_+Y", | |

| "local_z": "global_-X", | |
| "basis_vector": {"x": [0, 0, 1], "y": [0, 1, 0], "z": [-1, 0, 0]}, | |

| }, | |

| "Y_180": { | |
| "local_x": "global_-X", | |

| "local_y": "global_+Y", | |

| "local_z": "global_-Z", | |
| "basis_vector": {"x": [-1, 0, 0], "y": [0, 1, 0], "z": [0, 0, -1]}, | |

| }, | | | "X_CW_90_RIGHT_SIDE": { | | | "local_x": "global_+X", | | | "local_y": "global_-Z", | | | "local_z": "global_+Y", | | | "basis_vector": {"x": [1, 0, 0], "y": [0, 0, -1], "z": [0, 1, 0]}, | | | }, | | | "Z_CW_90_TOP_DOWN": { | | | "local_x": "global_+Y", | | | "local_y": "global_-X", | | | "local_z": "global_+Z", | | | "basis_vector": {"x": [0, 1, 0], "y": [-1, 0, 0], "z": [0, 0, 1]}, | | | }, | | | } | |

| def _dot_add(vec: List[float], pos: List[float]) -> List[float]: | |
| return [round(pos[i] + vec[i], 6) for i in range(3)] | |

| class OrientationStateRegister: | | | """ | | | Layer 3B β€” Orientation State Register (OSR) | | | Validates and applies spatial transforms using an indexed orientation table. | | | No unlabeled orientation. No freehand axis guessing. | | | Transform must be applied before movement. | | | Object attachment must be declared. | | | If task.orientation is None: module passes through (non-spatial tasks unaffected). | | | """ | | | name = "orientation_state_register" | | | def run(self, task: TaskInput) -> ModuleResult: | | | o = task.orientation | | | if o is None: | | | return ModuleResult(self.name, "PASS", | |

| data={"note": "no orientation contract supplied; non-spatial task"}) | |
| reasons = [] | |

| # Rule 1: orientation_state_id required | | | state_id = o.get("orientation_state_id") | | | if not state_id: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.ORIENTATION_STATE_MISSING, | | | self.name, "CLARIFY", "missing_orientation_state_id")], | | | data={"orientation_table_keys": list(ORIENTATION_TABLE.keys())}) | | | # Rule 2: state_id must be in the table | | | if state_id not in ORIENTATION_TABLE: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.ORIENTATION_TRANSFORM_UNSUPPORTED, | |

| self.name, "CLARIFY", f"state_id_not_in_table: {state_id}")], | |
| data={"valid_ids": list(ORIENTATION_TABLE.keys())}) | |
| basis = ORIENTATION_TABLE[state_id] | |

| # Rule 3: frame_id required | | | if not o.get("frame_id"): | | | reasons.append(reason(ReasonCode.ORIENTATION_BASIS_UNDECLARED, | | | self.name, "CLARIFY", "missing_frame_id")) | | | # Rule 4: object_attachment required if movement is declared | | | movement = o.get("movement_event") or o.get("movement") | | | if movement and "object_attachment" not in o: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.ORIENTATION_ATTACHMENT_UNDECLARED, | | | self.name, "CLARIFY", "object_attachment_missing")], | |

| data={ | |
| "explanation": ( | |

| "Two valid cases exist: " | | | "(A) independent object: move along rotated local axis from current position. " | | | "(B) attached object: rotate object position first, then apply local movement." | | | ) | | | }) | | | # Rule 5: movement_before_rotation check | |

| transform = o.get("transform_event") | |
| if o.get("movement_applied_before_rotation", False): | |

| reasons.append(reason(ReasonCode.ORIENTATION_MOVEMENT_BEFORE_ROTATION, | | | self.name, "BLOCK", "movement_before_rotation")) | | | return ModuleResult(self.name, "BLOCK", reasons) | | | # Rule 6: basis mismatch check | | | declared_basis = o.get("basis_map") or o.get("local_basis") | | | if declared_basis: | |

| for axis in ("local_x", "local_y", "local_z"): | |
| declared_val = declared_basis.get(axis, "").replace(" ", "") | |
| table_val = basis.get(axis, "").replace(" ", "") | |
| if declared_val and table_val and declared_val.upper() != table_val.upper(): | |

| reasons.append(reason(ReasonCode.ORIENTATION_BASIS_MISMATCH, | | | self.name, "REVISE", | |

| f"{axis}_declared={declared_val}_table={table_val}")) | |
| if any(r.severity == "BLOCK" for r in reasons): | |
| return ModuleResult(self.name, "BLOCK", reasons, data={"registered_basis": basis}) | |
| if any(r.code == ReasonCode.ORIENTATION_BASIS_MISMATCH for r in reasons): | |
| return ModuleResult(self.name, "REVISE", reasons, data={"registered_basis": basis}) | |

| if reasons: | | | return ModuleResult(self.name, "CLARIFY", reasons, data={"registered_basis": basis}) | | | # Compute final position if object_state and movement are provided | |

| computed = {} | |
| obj = o.get("object_state", {}) | |

| if obj and movement and "object_attachment" in o: | |

| start = obj.get("coordinates", []) | |
| attachment = o["object_attachment"] | |
| mv_axis = movement.get("axis", "") | |
| mv_mag = float(movement.get("magnitude", 0)) | |
| bv = basis.get("basis_vector", {}) | |
| axis_vec = bv.get(mv_axis.replace("local_", "").lower(), []) | |

| if start and axis_vec: | | | if attachment == "attached_to_cube" and transform: | | | rot_id = state_id | |

| if rot_id == "Y_CW_90_TOP_DOWN": | |
| rx = start[2] | |
| ry = start[1] | |
| rz = -start[0] | |
| rotated = [rx, ry, rz] | |
| elif rot_id == "Y_CCW_90_TOP_DOWN": | |
| rx = -start[2] | |
| ry = start[1] | |
| rz = start[0] | |
| rotated = [rx, ry, rz] | |
| elif rot_id == "Y_180": | |
| rotated = [-start[0], start[1], -start[2]] | |

| else: | |

| rotated = list(start) | |
| move_vec = [v * mv_mag for v in axis_vec] | |
| final = _dot_add(move_vec, rotated) | |
| computed = {"case": "B_attached", "rotated_first": rotated, "final": final} | |

| else: | |

| move_vec = [v * mv_mag for v in axis_vec] | |
| final = _dot_add(move_vec, start) | |
| computed = {"case": "A_independent", "final": final} | |

| return ModuleResult(self.name, "PASS", | | | data={"registered_basis": basis, "computed": computed or None}) | | | # ── Layer 3C: Narrative State Validation ─────────────────────────────────── | | | class NarrativeStateValidationModule: | | | """ | | | Layer 3C β€” Narrative State Validation (NSV) | | | Tracks how interpretations change over time. | | | Does not determine truth. Evaluates whether: | | | - interpretation changes are declared | | | - evidence changes are tracked | | | - confidence changes are justified | | | - reconstructed narratives remain distinguishable from observations | | | If task.narrative is None: module passes through (non-narrative tasks unaffected). | | | Boundary contract: | | | NSV evaluates: how interpretation changed | | | Evidence Validation evaluates: whether evidence supports the claim | | | NSV does not determine truth. Evidence does not track narrative evolution. | | | """ | | | name = "narrative_state_validation" | | | def run(self, task: TaskInput) -> ModuleResult: | | | n = task.narrative | | | if n is None: | |

| return ModuleResult(self.name, "STABLE", data={"note": "no narrative contract supplied"}) | |
| reasons = [] | |
| constraints = [] | |
| observed = n.get("observed_input", "") | |
| initial_interp = n.get("initial_interpretation", "") | |
| updated_interp = n.get("updated_interpretation", "") | |
| new_constraints = n.get("new_constraints", []) | |
| evidence_delta = n.get("evidence_delta", []) | |
| conf_before = n.get("confidence_before", "") | |
| conf_after = n.get("confidence_after", "") | |
| retro_flag = bool(n.get("retroactive_rewrite", False)) | |
| conf_high_no_evidence = bool(n.get("confidence_high_with_low_evidence", False)) | |
| social_reinforcement = bool(n.get("social_reinforcement_as_validation", False)) | |

| # Check 1: Observation / interpretation separation | |

| if observed and initial_interp and observed.strip() == initial_interp.strip(): | |
| reasons.append(reason( | |

| ReasonCode.NARRATIVE_OBSERVATION_INTERPRETATION_MERGED, | | | self.name, "CONSTRAIN", | | | "observation_equals_interpretation", | | | "Raw observation must be distinguishable from its interpretation.")) | | | constraints.append(constraint( | | | self.name, "output_type", "separate_observation_and_interpretation", | | | ReasonCode.NARRATIVE_OBSERVATION_INTERPRETATION_MERGED)) | | | # Check 2: Narrative drift tracking β€” confidence escalation without evidence delta | |

| CONFIDENCE_RANK = {"low": 0, "medium": 1, "high": 2} | |
| before_rank = CONFIDENCE_RANK.get(conf_before, -1) | |
| after_rank = CONFIDENCE_RANK.get(conf_after, -1) | |
| if before_rank >= 0 and after_rank > before_rank and not evidence_delta: | |
| reasons.append(reason( | |

| ReasonCode.NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED, | | | self.name, "REVISE", | | | "confidence_increased_without_evidence_delta", | | | f"Confidence escalated {conf_before} -> {conf_after} with no declared evidence delta.")) | | | # Check 3: Reinterpretation must be linked to a declared new constraint | | | if updated_interp and updated_interp != initial_interp: | | | if not new_constraints and not evidence_delta: | | | reasons.append(reason( | | | ReasonCode.NARRATIVE_REINTERPRETATION_UNATTRIBUTED, | | | self.name, "CLARIFY", | | | "reinterpretation_without_declared_cause", | | | "Interpretation changed but no new constraint or evidence delta was declared.")) | | | # Check 4: Memory confidence separation | | | if conf_high_no_evidence or social_reinforcement: | | | reasons.append(reason( | | | ReasonCode.NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED, | | | self.name, "CONSTRAIN", | | | "confidence_not_grounded_in_evidence", | | | "High confidence without evidence, or social/emotional reinforcement used as validation.")) | | | constraints.append(constraint( | | | self.name, "confidence", "max_medium_without_evidence", | | | ReasonCode.NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED)) | | | # Check 5: Retroactive reconstruction detection | | | if retro_flag: | | | reasons.append(reason( | | | ReasonCode.RETROACTIVE_NARRATIVE_COLLAPSE, | | | self.name, "REVISE", | | | "later_interpretation_presented_as_original_observation", | | | "A reconstructed interpretation must not be presented as unchanged historical observation.")) | | | meta_data = { | | | "confidence_before": conf_before, | | | "confidence_after": conf_after, | | | "evidence_delta_declared": bool(evidence_delta) | | | } | | | if any(r.severity == "BLOCK" for r in reasons): | | | return ModuleResult(self.name, "BLOCK", reasons, constraints, data=meta_data) | | | if any(r.code == ReasonCode.RETROACTIVE_NARRATIVE_COLLAPSE or | | | r.code == ReasonCode.NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED | | | for r in reasons): | | | return ModuleResult(self.name, "REVISE", reasons, constraints, data=meta_data) | | | if reasons: | |

| top_severity = max((r.severity for r in reasons), | |
| key=lambda s: {"INFO": 0, "CLARIFY": 1, "CONSTRAIN": 2, "REVISE": 3}.get(s, 0)) | |

| return ModuleResult(self.name, top_severity, reasons, constraints, data=meta_data) | | | return ModuleResult(self.name, "STABLE", data=meta_data) | | | # ── Layer 4: Risk Analysis ───────────────────────────────────────────────── | | | class RiskAnalysisModule: | | | name = "risk" | |

| def run(self, task: TaskInput) -> ModuleResult: | |
| risk = task.risk or {} | |
| reasons = [] | |
| irreversible = bool(risk.get("irreversible", False)) | |
| cascading = bool(risk.get("cascading_failure_surface", False)) | |
| fragile = bool(risk.get("single_fragile_assumption", False)) | |
| mitigation_available = bool(risk.get("mitigation_available", True)) | |

| evidence_confidence = risk.get("evidence_confidence", "medium") | | | if cascading: | | | reasons.append(reason(ReasonCode.RISK_CASCADING_FAILURE_SURFACE, | | | self.name, "CRITICAL", "cascading_failure_surface")) | | | if irreversible and evidence_confidence == "low": | | | reasons.append(reason(ReasonCode.RISK_IRREVERSIBLE_LOW_CONFIDENCE, | | | self.name, "CRITICAL", "irreversible_low_confidence")) | | | if fragile: | | | reasons.append(reason(ReasonCode.RISK_FRAGILE_ASSUMPTION, | | | self.name, "CRITICAL", "single_fragile_assumption")) | | | if reasons: | | | if not mitigation_available or cascading or (irreversible and evidence_confidence == "low"): | | | reasons.append(reason(ReasonCode.RISK_UNRESOLVED_CRITICAL, | | | self.name, "BLOCK", "critical_unresolved")) | | | return ModuleResult(self.name, "BLOCK", reasons) | | | return ModuleResult(self.name, "CRITICAL", reasons) | | | if irreversible or risk.get("external_users_affected", False): | | | return ModuleResult(self.name, "RISK", | | | [reason("RISK_REVERSIBILITY_OR_EXTERNAL_IMPACT", | | | self.name, "RISK", "bounded_risk")]) | | | return ModuleResult(self.name, "SAFE") | | | # ── Layer 5: Communication Validation ───────────────────────────────────── | | | class CommunicationValidationModule: | | | name = "communication" | | | def run(self, task: TaskInput) -> ModuleResult: | | | text = task.proposed_output or "" | | | if not text: | |

| return ModuleResult(self.name, "VALID", data={"note": "no proposed output supplied"}) | |
| reasons = [] | |
| if confidence_word(text): | |

| reasons.append(reason(ReasonCode.COMMUNICATION_TONE_OVERCLAIM, | | | self.name, "REVISE", "certainty_language")) | | | if contains_any(text, ["you are the only one", "this defines you", "your identity is"]): | | | reasons.append(reason(ReasonCode.COMMUNICATION_IDENTITY_BINDING, | | | self.name, "BLOCK", "identity_binding")) | | | return ModuleResult(self.name, "BLOCK", reasons) | | | if len(text) < 25 and task.metadata.get("source_detail_length", 0) > 500: | | | reasons.append(reason(ReasonCode.COMMUNICATION_COMPRESSION_LOSS, | | | self.name, "REVISE", "severe_compression")) | | | if reasons: | | | return ModuleResult(self.name, "REVISE", reasons) | | | return ModuleResult(self.name, "VALID") | | | # ── Layer 6: Evidence Validation ─────────────────────────────────────────── | | | class EvidenceValidationModule: | | | name = "evidence" | | | def run(self, task: TaskInput) -> ModuleResult: | | | output = task.proposed_output or "" | |

| evidence = task.evidence or {} | |
| reasons = [] | |
| constraints = [] | |
| external_validation = bool(evidence.get("external_validation", False)) | |
| baseline = evidence.get("baseline", None) | |
| observations = evidence.get("observations", []) | |
| if causal_word(output) and not evidence.get("causality_validated", False): | |

| reasons.append(reason(ReasonCode.EVIDENCE_CAUSALITY_OVERCLAIM, | | | self.name, "BLOCK", "causality_without_mapping")) | | | if confidence_word(output) and not external_validation: | | | reasons.append(reason(ReasonCode.EVIDENCE_CONFIDENCE_AS_TRUTH, | | | self.name, "BLOCK", "confidence_as_truth")) | | | if dashboard_authority_word(output) and not external_validation: | | | reasons.append(reason(ReasonCode.EVIDENCE_DASHBOARD_AUTHORITY, | | | self.name, "BLOCK", "dashboard_or_status_as_authority")) | | | if baseline is None and contains_any(output, ["trend", "stable", "improving", "weakening", "safe to automate"]): | | | reasons.append(reason(ReasonCode.EVIDENCE_NO_BASELINE, | | | self.name, "CONSTRAIN", "baseline_missing")) | | | constraints.append(constraint(self.name, "confidence", | | | "max_low_without_baseline", | |

| ReasonCode.EVIDENCE_NO_BASELINE)) | |
| if any(r.severity == "BLOCK" for r in reasons): | |

| return ModuleResult(self.name, "BLOCK", reasons, constraints) | | | if reasons or constraints: | | | return ModuleResult(self.name, "CONSTRAIN", reasons, constraints) | | | if observations or external_validation: | | | return ModuleResult(self.name, "RELEASE") | | | return ModuleResult(self.name, "HOLD", | | | [reason(ReasonCode.EVIDENCE_NO_BASELINE, self.name, "HOLD", | | | "no_observations_or_validation")]) | | | # ── Layer 7: Decision Aggregator ─────────────────────────────────────────── | | | MODULE_TO_FINAL: Dict[str, FinalDecision] = { | | | "PROCEED": FinalDecision.AUTHORIZE, | | | "READY": FinalDecision.AUTHORIZE, | | | "PASS": FinalDecision.AUTHORIZE, | | | "STABLE": FinalDecision.AUTHORIZE, | | | "SAFE": FinalDecision.AUTHORIZE, | | | "VALID": FinalDecision.AUTHORIZE, | | | "RELEASE": FinalDecision.AUTHORIZE, | | | "DEFER": FinalDecision.HOLD, | | | "HALT": FinalDecision.HALT, | | | "FAIL": FinalDecision.CLARIFY, | | | "CLARIFY": FinalDecision.CLARIFY, | | | "CONSTRAIN": FinalDecision.CONSTRAIN, | | | "CONSTRAINED": FinalDecision.CONSTRAIN, | | | "REVISE": FinalDecision.REVISE, | | | "HOLD": FinalDecision.HOLD, | | | "UNSTABLE": FinalDecision.HOLD, | | | "RISK": FinalDecision.CONSTRAIN, | | | "CRITICAL": FinalDecision.BLOCK, | | | "BLOCK": FinalDecision.BLOCK, | | | } | | | RESTRICTIVENESS: Dict[str, int] = { | | | "full_output": 0, | | | "analysis_only": 1, | | | "analysis_only_no_persuasion": 2, | | | "separate_observation_and_interpretation": 2, | | | "max_medium_without_evidence": 3, | | | "max_low_without_baseline": 3, | | | "simulation_only": 3, | | | "no_execution": 4, | | | "BLOCK": 5, | | | } | | | class DecisionAggregator: | | | name = "decision_aggregator" | | | def aggregate(self, results: List[ModuleResult]) -> Dict[str, Any]: | | | final = FinalDecision.AUTHORIZE | |

| all_constraints: List[Constraint] = [] | |
| all_reasons: List[Reason] = [] | |
| all_violations: List[ScopeViolation] = [] | |

| for r in results: | | | all_constraints.extend(r.constraints) | | | all_reasons.extend(r.reasons) | | | all_violations.extend(r.scope_violations) | | | mapped = MODULE_TO_FINAL.get(r.decision, FinalDecision.HOLD) | | | if FINAL_PRIORITY[mapped] > FINAL_PRIORITY[final]: | | | final = mapped | |

| merged = self._merge_constraints(all_constraints) | |
| if any(c.severity == "BLOCK" for c in merged): | |

| final = FinalDecision.BLOCK | | | if final == FinalDecision.AUTHORIZE and any(c.severity not in ("INFO",) for c in merged): | | | final = FinalDecision.CONSTRAIN | | | return { | | | "decision": final.value, | |

| "constraints": [asdict(c) for c in merged], | |
| "reasons": [asdict(r) for r in all_reasons], | |
| "scope_violations": [asdict(v) for v in all_violations], | |

| } | | | @staticmethod | |

| def _merge_constraints(constraints: List[Constraint]) -> List[Constraint]: | |
| merged: Dict[str, Constraint] = {} | |

| for c in constraints: | | | if not c.scope or not c.bound: | |

| key = f"invalid_{len(merged)}" | |
| merged[key] = Constraint(c.source_module, "runtime", | |

| "constraint_invalid_requires_revision", | | | ReasonCode.CONSTRAINT_INVALID, "BLOCK") | | | continue | | | existing = merged.get(c.scope) | | | if existing is None: | | | merged[c.scope] = c | | | else: | |

| old_rank = RESTRICTIVENESS.get(existing.bound, 1) | |
| new_rank = RESTRICTIVENESS.get(c.bound, 1) | |
| if new_rank >= old_rank: | |
| merged[c.scope] = c | |
| return list(merged.values()) | |

| # ── Pipeline runner ──────────────────────────────────────────────────────── | | | class LighthouseRunner: | | | """ | | | Full pipeline: -1, 0, 1, 2, 3A, 3B, 3C, 4, 5, 6, 7 | | | Layer 3 sublayers run in order: | | | 3A StateValidationModule | | | 3B OrientationStateRegister (only gates on spatial tasks) | | | 3C NarrativeStateValidationModule (only gates on narrative tasks) | | | """ | |

| def __init__(self) -> None: | |
| self.preflight = PreflightModule() | |
| self.execution = ExecutionControlModule() | |
| self.input_val = InputValidationModule() | |
| self.structure = StructuralValidationModule() | |
| self.state = StateValidationModule() | |
| self.osr = OrientationStateRegister() | |
| self.nsv = NarrativeStateValidationModule() | |
| self.risk = RiskAnalysisModule() | |
| self.communication = CommunicationValidationModule() | |
| self.evidence = EvidenceValidationModule() | |
| self.aggregator = DecisionAggregator() | |
| def run(self, task: TaskInput, context: Optional[RunContext] = None) -> Dict[str, Any]: | |
| context = context or RunContext() | |
| results = [] | |
| results.append(self.preflight.run(task)) | |
| execution_result = self.execution.run(task) | |
| results.append(execution_result) | |

| if execution_result.decision != "HALT": | |

| results.append(self.input_val.run(task)) | |
| results.append(self.structure.run(task)) | |
| results.append(self.state.run(task, context)) # 3A | |
| results.append(self.osr.run(task)) # 3B | |
| results.append(self.nsv.run(task)) # 3C | |
| results.append(self.risk.run(task)) | |
| results.append(self.communication.run(task)) | |
| results.append(self.evidence.run(task)) | |
| aggregate = self.aggregator.aggregate(results) | |

| return self._receipt(task, context, results, aggregate) | |

| def _receipt(self, task, context, results, aggregate) -> Dict[str, Any]: | |
| module_map = {} | |

| for r in results: | |

| entry = {"decision": r.decision, "reasons": [asdict(x) for x in r.reasons]} | |
| if r.module == "state": | |
| entry.update(r.data) | |

| elif r.data: | |

| entry["data"] = r.data | |
| module_map[r.module] = entry | |
| execution_trace = [ | |

| "preflight", "execution_control", "input_validation", | | | "structure", "state", "orientation_state_register", | | | "narrative_state_validation", "risk", "communication", | | | "evidence", "decision_aggregator", | | | ] | | | return { | | | "system_version": SYSTEM_VERSION, | | | "reason_registry_version": REASON_REGISTRY_VERSION, | |

| "input_id": task.metadata.get("input_id", f"input-{uuid.uuid4().hex[:10]}"), | |
| "timestamp": utc_now(), | |
| "execution_state": module_map.get("execution_control", {}).get("decision", "READY"), | |

| "execution_trace": execution_trace, | | | "run_context": asdict(context), | | | "modules": module_map, | |

| "constraints": aggregate["constraints"], | |
| "decision": aggregate["decision"], | |
| "revise_trace": task.metadata.get("revise_trace", []), | |
| "scope_violations": aggregate["scope_violations"], | |
| "override": task.metadata.get("override", { | |
| "used": False, "tier": None, "authorized_by": None, | |

| "authorization_level": None, "reason": None, | |

| "timestamp": None, "ttl": None, | |
| }), | |

| } | | | # ── Demo cases ───────────────────────────────────────────────────────────── | | | def demo_cases() -> Dict[str, TaskInput]: | | | return { | | | "clean": TaskInput( | | | prompt="Evaluate whether this statement can proceed.", | | | proposed_output="The observed structure persisted across five windows with consistent topology.", | | | intent="analysis", | | | frame_type="factual", | | | evidence={ | | | "observations": ["structure present across five windows"], | | | "external_validation": True, | | | "baseline": "available", | | | "confidence": "medium", | | | "causality_validated": True, | | | }, | | | ), | | | "overclaim": TaskInput( | | | prompt="Evaluate this output.", | | | proposed_output="The system clearly confirms a stable trend because the dashboard color is green.", | | | intent="analysis", | | | frame_type="factual", | |

| evidence={ | |
| "observations": ["dashboard_color=green"], | |

| "external_validation": False, | | | "baseline": None, | | | "confidence": "low", | | | "causality_validated": False, | | | }, | | | ), | | | "risk": TaskInput( | | | prompt="Should the system proceed with an irreversible migration?", | | | proposed_output="Proceed with the migration.", | | | intent="analysis", | | | frame_type="operational", | | | known_unknowns=["rollback feasibility", "dependency map"], | | | risk={ | | | "irreversible": True, | | | "cascading_failure_surface": True, | | | "mitigation_available": False, | | | "evidence_confidence": "low", | | | "external_users_affected": True, | | | }, | | | ), | | | "drift": TaskInput( | | | prompt="Calculate state stability.", | | | proposed_output="State drift is constrained.", | | | intent="analysis", | | | frame_type="factual", | | | state={ | | | "invariant_violations": 1, | | | "boundary_crossings": 1, | | | "assumption_changes": 2, | | | "inconsistencies": 1, | | | "prediction_divergence": True, | | | "shadow_state_available": False, | | | "requires_shadow_state": True, | | | }, | | | evidence={"observations": ["state signals provided"], "baseline": "declared"}, | | | ), | | | "spatial_ambiguous": TaskInput( | | | prompt="A wheel spins clockwise. Which way is it rotating?", | | | intent="analysis", | | | frame_type="factual", | | | orientation={ | | | "frame_id": "observer_unspecified", | | | }, | | | ), | | | "spatial_locked": TaskInput( | | | prompt="Viewed from above, a sphere starts at (1,1,1). The cube rotates 90 degrees clockwise.", | | | intent="analysis", | | | frame_type="factual", | | | orientation={ | | | "orientation_state_id": "Y_CW_90_TOP_DOWN", | | | "frame_id": "observer_global", | | | "object_attachment": "independent_of_cube", | | | "object_state": { | | | "object_id": "red_sphere", | | | "coordinates": [1, 1, 1], | | | "coordinate_frame": "observer_global", | | | }, | | | "transform_event": { | | | "axis": "Y", | | | "rotation": "90_CW", | | | "reference_view": "top_down", | | | }, | | | "movement": { | | | "axis": "local_x", | | | "magnitude": 1, | | | }, | | | }, | | | ), | | | "spatial_attached": TaskInput( | | | prompt="Sphere at (1,1,1) is attached to the cube. Cube rotates 90 degrees CW top-down.", | | | intent="analysis", | | | frame_type="factual", | | | orientation={ | | | "orientation_state_id": "Y_CW_90_TOP_DOWN", | | | "frame_id": "observer_global", | | | "object_attachment": "attached_to_cube", | | | "object_state": { | | | "object_id": "red_sphere", | | | "coordinates": [1, 1, 1], | | | "coordinate_frame": "observer_global", | | | }, | | | "transform_event": { | | | "axis": "Y", | | | "rotation": "90_CW", | | | "reference_view": "top_down", | | | }, | | | "movement": { | | | "axis": "local_x", | | | "magnitude": 1, | | | }, | | | }, | | | ), | | | "narrative_drift": TaskInput( | | | prompt="Evaluate this interpretation update.", | | | intent="analysis", | | | frame_type="factual", | | | narrative={ | | | "observed_input": "The dashboard is green.", | | | "initial_interpretation": "The system may be operating normally.", | | | "updated_interpretation": "The system is confirmed stable.", | |

| "new_constraints": [], | |
| "evidence_delta": [], | |

| "confidence_before": "low", | | | "confidence_after": "high", | | | }, | | | ), | | | "narrative_retro": TaskInput( | | | prompt="Evaluate whether this memory reconstruction is valid.", | | | intent="analysis", | | | frame_type="factual", | | | narrative={ | | | "observed_input": "Pattern A was observed three times.", | | | "initial_interpretation": "Pattern A may indicate instability.", | | | "updated_interpretation": "We always knew Pattern A was the root cause.", | |

| "new_constraints": [], | |
| "evidence_delta": [], | |

| "confidence_before": "medium", | | | "confidence_after": "high", | | | "retroactive_rewrite": True, | | | }, | | | ), | | | "narrative_clean": TaskInput( | | | prompt="Evaluate this properly attributed interpretation update.", | | | intent="analysis", | | | frame_type="factual", | | | narrative={ | | | "observed_input": "System logged 5 errors in window 3.", | | | "initial_interpretation": "Possible instability in module X.", | | | "updated_interpretation": "Confirmed instability in module X.", | | | "new_constraints": ["baseline comparison now available"], | | | "evidence_delta": ["external_audit_confirmed_module_X_failure"], | | | "confidence_before": "low", | | | "confidence_after": "high", | | | "retroactive_rewrite": False, | | | }, | | | ), | | | } | |

| def task_from_json(path: str) -> TaskInput: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return TaskInput(**data) | |
| def main() -> None: | |

| parser = argparse.ArgumentParser(description="Public Lighthouse Core v1.7 reference runner") | |

| parser.add_argument("--demo", choices=list(demo_cases().keys()), help="Run a built-in demo case") | |
| parser.add_argument("--input", help="Path to JSON task input") | |
| parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON") | |
| args = parser.parse_args() | |

| if args.input: | | | task = task_from_json(args.input) | | | elif args.demo: | | | task = demo_cases()[args.demo] | | | else: | |

| task = demo_cases()["overclaim"] | |
| receipt = LighthouseRunner().run(task) | |
| print(json.dumps(receipt, indent=2 if args.pretty else None)) | |
| if __name__ == "__main__": | |
| main() |
── more in #ai-safety 4 stories Β· sorted by recency
sponsored brought to you by zahid.host 4,200+ EU-deployed projects
reading about agents? ship yours in a single git push.

Run your AI side-project on zahid.host

EU-based hosting, git-push deploys, automatic HTTPS, no cold starts. Free tier with a custom domain β€” perfect for shipping the agent you just read about.

$git push zahid main
β†’ Live at https://your-agent.zahid.host βœ“
Get free account β†’ Pricing
from €0/mo Β· no card required
LIVE [news/lighthouse-v1-7-py] indexed:0 read:33min 2026-05-26 Β· β€”