# lighthouse_v1_7.py

> Source: <https://gist.github.com/intheheartofit/e22a4c95700d4526b9926dc0cf3a1bd8>
> Published: 2026-05-26 14:12:19+00:00

| #!/usr/bin/env python3 | |
| """ | |
| PUBLIC LIGHTHOUSE CORE v1.7 | |
| AI Output Governance and Validation Runtime | |
| Adds two new sublayers to Layer 3 (State Validation): | |
| 3A State Validation (drift scoring) — unchanged from v1.6 | |
| 3B Orientation State Register (OSR) — spatial frame / basis tracking | |
| 3C Narrative State Validation (NSV) — interpretation drift tracking | |
| All other modules and the aggregator are unchanged from v1.6. | |
| Run: | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo clean --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo overclaim --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo spatial_ambiguous --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo spatial_locked --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo narrative_drift --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo narrative_retro --pretty | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| from dataclasses import dataclass, field, asdict | |
| from datetime import datetime, timezone | |
| from enum import Enum | |
| import json | |
| from typing import Any, Dict, List, Optional | |
| import uuid | |
| SYSTEM_VERSION = "PUBLIC_LIGHTHOUSE_CORE_1.7.0" | |
| REASON_REGISTRY_VERSION = "1.1.0" | |
| # ── Decision types ───────────────────────────────────────────────────────── | |
| class FinalDecision(str, Enum): | |
| AUTHORIZE = "AUTHORIZE" | |
| CONSTRAIN = "CONSTRAIN" | |
| REVISE = "REVISE" | |
| HOLD = "HOLD" | |
| CLARIFY = "CLARIFY" | |
| HALT = "HALT" | |
| BLOCK = "BLOCK" | |
| FINAL_PRIORITY = { | |
| FinalDecision.BLOCK: 7, | |
| FinalDecision.HALT: 6, | |
| FinalDecision.CLARIFY: 5, | |
| FinalDecision.HOLD: 4, | |
| FinalDecision.REVISE: 3, | |
| FinalDecision.CONSTRAIN: 2, | |
| FinalDecision.AUTHORIZE: 1, | |
| } | |
| # ── Reason codes ─────────────────────────────────────────────────────────── | |
| class ReasonCode: | |
| # Preflight | |
| PREFLIGHT_PURPOSE_MISSING = "PREFLIGHT_PURPOSE_MISSING" | |
| PREFLIGHT_BIASED_FRAME = "PREFLIGHT_BIASED_FRAME" | |
| PREFLIGHT_FORCED_CONCLUSION = "PREFLIGHT_FORCED_CONCLUSION" | |
| # Execution Control | |
| EXECUTION_AGENT_UNVERIFIABLE = "EXECUTION_AGENT_UNVERIFIABLE" | |
| EXECUTION_SCOPE_VIOLATION = "EXECUTION_SCOPE_VIOLATION" | |
| EXECUTION_PERMISSION_MISSING = "EXECUTION_PERMISSION_MISSING" | |
| EXECUTION_STALE_HALT = "EXECUTION_STALE_HALT" | |
| # Input Validation | |
| INPUT_FRAME_NOT_DECLARED = "INPUT_FRAME_NOT_DECLARED" | |
| INPUT_KNOWN_UNKNOWNS_MISSING = "INPUT_KNOWN_UNKNOWNS_MISSING" | |
| INPUT_IMPOSSIBLE_REQUEST = "INPUT_IMPOSSIBLE_REQUEST" | |
| # Structural Validation | |
| STRUCTURE_CHAIN_NOT_REPLAYABLE = "STRUCTURE_CHAIN_NOT_REPLAYABLE" | |
| STRUCTURE_TERMINAL_STATE_MISMATCH = "STRUCTURE_TERMINAL_STATE_MISMATCH" | |
| STRUCTURE_STEP_INVALID = "STRUCTURE_STEP_INVALID" | |
| # State Validation (3A) | |
| STATE_INVARIANT_VIOLATION = "STATE_INVARIANT_VIOLATION" | |
| STATE_BOUNDARY_CROSSING = "STATE_BOUNDARY_CROSSING" | |
| STATE_SHADOW_UNAVAILABLE = "STATE_SHADOW_UNAVAILABLE" | |
| STATE_IDENTITY_LOST = "STATE_IDENTITY_LOST" | |
| # Orientation State Register (3B) | |
| ORIENTATION_STATE_MISSING = "ORIENTATION_STATE_MISSING" | |
| ORIENTATION_BASIS_UNDECLARED = "ORIENTATION_BASIS_UNDECLARED" | |
| ORIENTATION_BASIS_MISMATCH = "ORIENTATION_BASIS_MISMATCH" | |
| ORIENTATION_ATTACHMENT_UNDECLARED = "ORIENTATION_ATTACHMENT_UNDECLARED" | |
| ORIENTATION_TRANSFORM_UNSUPPORTED = "ORIENTATION_TRANSFORM_UNSUPPORTED" | |
| ORIENTATION_FRAME_DRIFT = "ORIENTATION_FRAME_DRIFT" | |
| ORIENTATION_MOVEMENT_BEFORE_ROTATION = "ORIENTATION_MOVEMENT_APPLIED_BEFORE_ROTATION" | |
| # Narrative State Validation (3C) | |
| NARRATIVE_OBSERVATION_INTERPRETATION_MERGED = "NARRATIVE_OBSERVATION_INTERPRETATION_MERGE" | |
| NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED = "NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED" | |
| NARRATIVE_REINTERPRETATION_UNATTRIBUTED = "NARRATIVE_REINTERPRETATION_UNATTRIBUTED" | |
| NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED = "NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED" | |
| RETROACTIVE_NARRATIVE_COLLAPSE = "RETROACTIVE_NARRATIVE_COLLAPSE" | |
| # Risk Analysis | |
| RISK_IRREVERSIBLE_LOW_CONFIDENCE = "RISK_IRREVERSIBLE_LOW_CONFIDENCE" | |
| RISK_CASCADING_FAILURE_SURFACE = "RISK_CASCADING_FAILURE_SURFACE" | |
| RISK_FRAGILE_ASSUMPTION = "RISK_FRAGILE_ASSUMPTION" | |
| RISK_UNRESOLVED_CRITICAL = "RISK_UNRESOLVED_CRITICAL" | |
| # Communication Validation | |
| COMMUNICATION_TONE_OVERCLAIM = "COMMUNICATION_TONE_OVERCLAIM" | |
| COMMUNICATION_COMPRESSION_LOSS = "COMMUNICATION_COMPRESSION_LOSS" | |
| COMMUNICATION_IDENTITY_BINDING = "COMMUNICATION_IDENTITY_BINDING" | |
| COMMUNICATION_FALSE_CERTAINTY = "COMMUNICATION_FALSE_CERTAINTY" | |
| # Evidence Validation | |
| EVIDENCE_CAUSALITY_OVERCLAIM = "EVIDENCE_CAUSALITY_OVERCLAIM" | |
| EVIDENCE_CONFIDENCE_AS_TRUTH = "EVIDENCE_CONFIDENCE_AS_TRUTH" | |
| EVIDENCE_DASHBOARD_AUTHORITY = "EVIDENCE_DASHBOARD_AUTHORITY" | |
| EVIDENCE_NO_BASELINE = "EVIDENCE_NO_BASELINE" | |
| # System | |
| SCOPE_VIOLATION_MODULE_OVERREACH = "SCOPE_VIOLATION_MODULE_OVERREACH" | |
| REVISE_LOOP_UNRESOLVED = "REVISE_LOOP_UNRESOLVED" | |
| CONSTRAINT_INVALID = "CONSTRAINT_INVALID" | |
| # ── Data contracts ───────────────────────────────────────────────────────── | |
| @dataclass | |
| class Reason: | |
| code: str | |
| module: str | |
| severity: str | |
| signal_source: str = "" | |
| description: str = "" | |
| @dataclass | |
| class Constraint: | |
| source_module: str | |
| scope: str | |
| bound: str | |
| reason_code: str | |
| severity: str = "CONSTRAIN" | |
| @dataclass | |
| class ScopeViolation: | |
| module: str | |
| violation: str | |
| correct_module: str | |
| reason_code: str = ReasonCode.SCOPE_VIOLATION_MODULE_OVERREACH | |
| @dataclass | |
| class ModuleResult: | |
| module: str | |
| decision: str | |
| reasons: List[Reason] = field(default_factory=list) | |
| constraints: List[Constraint] = field(default_factory=list) | |
| scope_violations: List[ScopeViolation] = field(default_factory=list) | |
| data: Dict[str, Any] = field(default_factory=dict) | |
| @dataclass | |
| class RunContext: | |
| pass_number: int = 1 | |
| max_passes: int = 2 | |
| revise_loop_active: bool = False | |
| prior_stable_state_id: Optional[str] = None | |
| prediction_model: str = "none" | |
| halt_resumed: bool = False | |
| @dataclass | |
| class TaskInput: | |
| prompt: str | |
| proposed_output: str = "" | |
| agent_id: Optional[str] = "local_user" | |
| permission_envelope: Optional[Dict[str, Any]] = field(default_factory=lambda: { | |
| "declared_capabilities": ["analyze", "validate"], | |
| "declared_authority_boundary": "local", | |
| "allowed_actions": ["analyze", "validate"], | |
| "resource_scope": "local", | |
| }) | |
| action: str = "analyze" | |
| intent: Optional[str] = None | |
| frame_type: Optional[str] = None | |
| known_unknowns: List[str] = field(default_factory=list) | |
| evidence: Dict[str, Any] = field(default_factory=dict) | |
| transform_chain: List[Dict[str, Any]] = field(default_factory=list) | |
| state: Dict[str, Any] = field(default_factory=dict) | |
| # OSR fields (Layer 3B) | |
| orientation: Optional[Dict[str, Any]] = None | |
| # NSV fields (Layer 3C) | |
| narrative: Optional[Dict[str, Any]] = None | |
| risk: Dict[str, Any] = field(default_factory=dict) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| # ── Helpers ──────────────────────────────────────────────────────────────── | |
| def utc_now() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def reason(code, module, severity, signal_source="", description="") -> Reason: | |
| return Reason(code=code, module=module, severity=severity, | |
| signal_source=signal_source, description=description) | |
| def constraint(module, scope, bound, code, severity="CONSTRAIN") -> Constraint: | |
| return Constraint(source_module=module, scope=scope, bound=bound, | |
| reason_code=code, severity=severity) | |
| def contains_any(text: str, terms: List[str]) -> bool: | |
| lower = text.lower() | |
| return any(t.lower() in lower for t in terms) | |
| def confidence_word(text: str) -> bool: | |
| return contains_any(text, ["clearly", "definitely", "proves", "confirmed", | |
| "guaranteed", "always", "never", "certain"]) | |
| def causal_word(text: str) -> bool: | |
| return contains_any(text, ["because", "caused", "causes", "proves", | |
| "driven by", "confirms"]) | |
| def dashboard_authority_word(text: str) -> bool: | |
| return contains_any(text, ["green", "score", "rank", "safe to automate", | |
| "trustworthy", "authority"]) | |
| # ── Layer -1: Preflight ──────────────────────────────────────────────────── | |
| class PreflightModule: | |
| name = "preflight" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| text = task.prompt or "" | |
| if not task.intent: | |
| if any(q in text.lower() for q in ["evaluate", "analyze", "calculate", "what is", "should", "question"]): | |
| task.intent = "analysis" | |
| else: | |
| return ModuleResult(self.name, "CLARIFY", | |
| [reason(ReasonCode.PREFLIGHT_PURPOSE_MISSING, | |
| self.name, "CLARIFY", "intent_missing")]) | |
| if contains_any(text, ["convince me", "prove that", "show why this is true", "make them believe"]): | |
| return ModuleResult(self.name, "CONSTRAIN", | |
| [reason(ReasonCode.PREFLIGHT_BIASED_FRAME, | |
| self.name, "CONSTRAIN", "steered_or_persuasive_frame")], | |
| [constraint(self.name, "output_type", "analysis_only_no_persuasion", | |
| ReasonCode.PREFLIGHT_BIASED_FRAME)]) | |
| if contains_any(text, ["ignore evidence", "force conclusion", "must conclude"]): | |
| return ModuleResult(self.name, "BLOCK", | |
| [reason(ReasonCode.PREFLIGHT_FORCED_CONCLUSION, | |
| self.name, "BLOCK", "forced_conclusion")]) | |
| return ModuleResult(self.name, "PROCEED") | |
| # ── Layer 0: Execution Control ───────────────────────────────────────────── | |
| class ExecutionControlModule: | |
| name = "execution_control" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| if not task.agent_id: | |
| return ModuleResult(self.name, "HALT", | |
| [reason(ReasonCode.EXECUTION_AGENT_UNVERIFIABLE, | |
| self.name, "HALT", "missing_agent_id")]) | |
| if not task.permission_envelope: | |
| return ModuleResult(self.name, "DEFER", | |
| [reason(ReasonCode.EXECUTION_PERMISSION_MISSING, | |
| self.name, "DEFER", "missing_permission_envelope")]) | |
| allowed = set(task.permission_envelope.get("allowed_actions", [])) | |
| scope = task.permission_envelope.get("resource_scope", "") | |
| boundary = task.permission_envelope.get("declared_authority_boundary", "") | |
| if task.action not in allowed: | |
| return ModuleResult(self.name, "BLOCK", | |
| [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | |
| self.name, "BLOCK", "action_not_allowed")]) | |
| if scope not in ("local", "system", "external"): | |
| return ModuleResult(self.name, "BLOCK", | |
| [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | |
| self.name, "BLOCK", "invalid_resource_scope")]) | |
| if boundary == "local" and scope == "external": | |
| return ModuleResult(self.name, "BLOCK", | |
| [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | |
| self.name, "BLOCK", "outside_authority_boundary")]) | |
| return ModuleResult(self.name, "READY") | |
| # ── Layer 1: Input Validation ────────────────────────────────────────────── | |
| class InputValidationModule: | |
| name = "input_validation" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| text = task.prompt or "" | |
| if not text.strip(): | |
| return ModuleResult(self.name, "CLARIFY", | |
| [reason(ReasonCode.INPUT_FRAME_NOT_DECLARED, | |
| self.name, "CLARIFY", "empty_prompt")]) | |
| if contains_any(text, ["square circle", "rotate 90 in two directions at once"]): | |
| return ModuleResult(self.name, "BLOCK", | |
| [reason(ReasonCode.INPUT_IMPOSSIBLE_REQUEST, | |
| self.name, "BLOCK", "impossible_request")]) | |
| if not task.frame_type: | |
| task.frame_type = "factual" if "what is" in text.lower() else "analytical" | |
| if task.frame_type in ("speculative", "operational") and not task.known_unknowns: | |
| return ModuleResult(self.name, "CLARIFY", | |
| [reason(ReasonCode.INPUT_KNOWN_UNKNOWNS_MISSING, | |
| self.name, "CLARIFY", "known_unknowns_missing")]) | |
| return ModuleResult(self.name, "PROCEED") | |
| # ── Layer 2: Structural Validation ───────────────────────────────────────── | |
| class StructuralValidationModule: | |
| name = "structure" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| chain = task.transform_chain | |
| if not chain: | |
| return ModuleResult(self.name, "PASS", | |
| data={"replayable": None, "note": "no explicit transform chain supplied"}) | |
| previous_after = None | |
| for i, step in enumerate(chain): | |
| missing = [k for k in ("operation", "state_before", "state_after") if k not in step] | |
| if missing: | |
| return ModuleResult(self.name, "FAIL", | |
| [reason(ReasonCode.STRUCTURE_CHAIN_NOT_REPLAYABLE, | |
| self.name, "FAIL", f"missing_step_{i}")], | |
| data={"missing_step_index": i, "missing_fields": missing}) | |
| if previous_after is not None and step["state_before"] != previous_after: | |
| return ModuleResult(self.name, "FAIL", | |
| [reason(ReasonCode.STRUCTURE_STEP_INVALID, | |
| self.name, "FAIL", f"step_{i}_before_mismatch")]) | |
| previous_after = step["state_after"] | |
| claimed = task.metadata.get("claimed_final_state") | |
| if claimed is not None and previous_after is not None and claimed != previous_after: | |
| return ModuleResult(self.name, "BLOCK", | |
| [reason(ReasonCode.STRUCTURE_TERMINAL_STATE_MISMATCH, | |
| self.name, "BLOCK", "claimed_final_state_mismatch")]) | |
| return ModuleResult(self.name, "PASS", data={"replayable": True}) | |
| # ── Layer 3A: State Validation ───────────────────────────────────────────── | |
| class StateValidationModule: | |
| name = "state" | |
| def run(self, task: TaskInput, context: RunContext) -> ModuleResult: | |
| state = task.state or {} | |
| iv = int(state.get("invariant_violations", 0)) | |
| bc = int(state.get("boundary_crossings", 0)) | |
| ac = int(state.get("assumption_changes", 0)) | |
| ic = int(state.get("inconsistencies", 0)) | |
| pd = bool(state.get("prediction_divergence", False)) and context.prediction_model != "none" | |
| shadow_avail = bool(state.get("shadow_state_available", False)) | |
| needs_shadow = bool(state.get("requires_shadow_state", False)) | |
| inv_c = 0.30 * (1 if iv > 0 else 0) | |
| bnd_c = 0.25 * min(bc / 2, 1.0) | |
| asm_c = 0.15 * min(ac / 3, 1.0) | |
| inc_c = 0.20 * min(ic / 2, 1.0) | |
| prd_c = 0.10 * (1 if pd else 0) | |
| shd_p = 0.05 if needs_shadow and not shadow_avail else 0.0 | |
| drift = min(inv_c + bnd_c + asm_c + inc_c + prd_c + shd_p, 1.0) | |
| reasons = [] | |
| if iv: | |
| reasons.append(reason(ReasonCode.STATE_INVARIANT_VIOLATION, | |
| self.name, "CONSTRAIN", "invariant_violations")) | |
| if bc: | |
| reasons.append(reason(ReasonCode.STATE_BOUNDARY_CROSSING, | |
| self.name, "CONSTRAIN", "boundary_crossings")) | |
| if needs_shadow and not shadow_avail: | |
| reasons.append(reason(ReasonCode.STATE_SHADOW_UNAVAILABLE, | |
| self.name, "INFO", "shadow_state_unavailable")) | |
| if state.get("identity_lost", False): | |
| reasons.append(reason(ReasonCode.STATE_IDENTITY_LOST, | |
| self.name, "BLOCK", "identity_lost")) | |
| return ModuleResult(self.name, "BLOCK", reasons, | |
| data=self._data(drift, inv_c, bnd_c, asm_c, inc_c, prd_c, "UNSTABLE")) | |
| if drift >= 0.70: | |
| cl = "UNSTABLE" | |
| elif drift >= 0.30: | |
| cl = "CONSTRAINED" | |
| else: | |
| cl = "STABLE" | |
| return ModuleResult(self.name, cl, reasons, | |
| data=self._data(drift, inv_c, bnd_c, asm_c, inc_c, prd_c, cl)) | |
| @staticmethod | |
| def _data(score, inv, bnd, asm, inc, prd, cl) -> Dict[str, Any]: | |
| return { | |
| "drift_score": round(score, 4), | |
| "drift_components": { | |
| "invariant": round(inv, 4), | |
| "boundary": round(bnd, 4), | |
| "assumption": round(asm, 4), | |
| "inconsistency": round(inc, 4), | |
| "prediction": round(prd, 4), | |
| }, | |
| "stability_classification": cl, | |
| } | |
| # ── Layer 3B: Orientation State Register ─────────────────────────────────── | |
| # Standard basis lookup table (top-down / right-hand convention) | |
| ORIENTATION_TABLE: Dict[str, Dict[str, Any]] = { | |
| "IDENTITY": { | |
| "local_x": "global_+X", | |
| "local_y": "global_+Y", | |
| "local_z": "global_+Z", | |
| "basis_vector": {"x": [1, 0, 0], "y": [0, 1, 0], "z": [0, 0, 1]}, | |
| }, | |
| "Y_CW_90_TOP_DOWN": { | |
| "local_x": "global_-Z", | |
| "local_y": "global_+Y", | |
| "local_z": "global_+X", | |
| "basis_vector": {"x": [0, 0, -1], "y": [0, 1, 0], "z": [1, 0, 0]}, | |
| }, | |
| "Y_CCW_90_TOP_DOWN": { | |
| "local_x": "global_+Z", | |
| "local_y": "global_+Y", | |
| "local_z": "global_-X", | |
| "basis_vector": {"x": [0, 0, 1], "y": [0, 1, 0], "z": [-1, 0, 0]}, | |
| }, | |
| "Y_180": { | |
| "local_x": "global_-X", | |
| "local_y": "global_+Y", | |
| "local_z": "global_-Z", | |
| "basis_vector": {"x": [-1, 0, 0], "y": [0, 1, 0], "z": [0, 0, -1]}, | |
| }, | |
| "X_CW_90_RIGHT_SIDE": { | |
| "local_x": "global_+X", | |
| "local_y": "global_-Z", | |
| "local_z": "global_+Y", | |
| "basis_vector": {"x": [1, 0, 0], "y": [0, 0, -1], "z": [0, 1, 0]}, | |
| }, | |
| "Z_CW_90_TOP_DOWN": { | |
| "local_x": "global_+Y", | |
| "local_y": "global_-X", | |
| "local_z": "global_+Z", | |
| "basis_vector": {"x": [0, 1, 0], "y": [-1, 0, 0], "z": [0, 0, 1]}, | |
| }, | |
| } | |
| def _dot_add(vec: List[float], pos: List[float]) -> List[float]: | |
| return [round(pos[i] + vec[i], 6) for i in range(3)] | |
| class OrientationStateRegister: | |
| """ | |
| Layer 3B — Orientation State Register (OSR) | |
| Validates and applies spatial transforms using an indexed orientation table. | |
| No unlabeled orientation. No freehand axis guessing. | |
| Transform must be applied before movement. | |
| Object attachment must be declared. | |
| If task.orientation is None: module passes through (non-spatial tasks unaffected). | |
| """ | |
| name = "orientation_state_register" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| o = task.orientation | |
| if o is None: | |
| return ModuleResult(self.name, "PASS", | |
| data={"note": "no orientation contract supplied; non-spatial task"}) | |
| reasons = [] | |
| # Rule 1: orientation_state_id required | |
| state_id = o.get("orientation_state_id") | |
| if not state_id: | |
| return ModuleResult(self.name, "CLARIFY", | |
| [reason(ReasonCode.ORIENTATION_STATE_MISSING, | |
| self.name, "CLARIFY", "missing_orientation_state_id")], | |
| data={"orientation_table_keys": list(ORIENTATION_TABLE.keys())}) | |
| # Rule 2: state_id must be in the table | |
| if state_id not in ORIENTATION_TABLE: | |
| return ModuleResult(self.name, "CLARIFY", | |
| [reason(ReasonCode.ORIENTATION_TRANSFORM_UNSUPPORTED, | |
| self.name, "CLARIFY", f"state_id_not_in_table: {state_id}")], | |
| data={"valid_ids": list(ORIENTATION_TABLE.keys())}) | |
| basis = ORIENTATION_TABLE[state_id] | |
| # Rule 3: frame_id required | |
| if not o.get("frame_id"): | |
| reasons.append(reason(ReasonCode.ORIENTATION_BASIS_UNDECLARED, | |
| self.name, "CLARIFY", "missing_frame_id")) | |
| # Rule 4: object_attachment required if movement is declared | |
| movement = o.get("movement_event") or o.get("movement") | |
| if movement and "object_attachment" not in o: | |
| return ModuleResult(self.name, "CLARIFY", | |
| [reason(ReasonCode.ORIENTATION_ATTACHMENT_UNDECLARED, | |
| self.name, "CLARIFY", "object_attachment_missing")], | |
| data={ | |
| "explanation": ( | |
| "Two valid cases exist: " | |
| "(A) independent object: move along rotated local axis from current position. " | |
| "(B) attached object: rotate object position first, then apply local movement." | |
| ) | |
| }) | |
| # Rule 5: movement_before_rotation check | |
| transform = o.get("transform_event") | |
| if o.get("movement_applied_before_rotation", False): | |
| reasons.append(reason(ReasonCode.ORIENTATION_MOVEMENT_BEFORE_ROTATION, | |
| self.name, "BLOCK", "movement_before_rotation")) | |
| return ModuleResult(self.name, "BLOCK", reasons) | |
| # Rule 6: basis mismatch check | |
| declared_basis = o.get("basis_map") or o.get("local_basis") | |
| if declared_basis: | |
| for axis in ("local_x", "local_y", "local_z"): | |
| declared_val = declared_basis.get(axis, "").replace(" ", "") | |
| table_val = basis.get(axis, "").replace(" ", "") | |
| if declared_val and table_val and declared_val.upper() != table_val.upper(): | |
| reasons.append(reason(ReasonCode.ORIENTATION_BASIS_MISMATCH, | |
| self.name, "REVISE", | |
| f"{axis}_declared={declared_val}_table={table_val}")) | |
| if any(r.severity == "BLOCK" for r in reasons): | |
| return ModuleResult(self.name, "BLOCK", reasons, data={"registered_basis": basis}) | |
| if any(r.code == ReasonCode.ORIENTATION_BASIS_MISMATCH for r in reasons): | |
| return ModuleResult(self.name, "REVISE", reasons, data={"registered_basis": basis}) | |
| if reasons: | |
| return ModuleResult(self.name, "CLARIFY", reasons, data={"registered_basis": basis}) | |
| # Compute final position if object_state and movement are provided | |
| computed = {} | |
| obj = o.get("object_state", {}) | |
| if obj and movement and "object_attachment" in o: | |
| start = obj.get("coordinates", []) | |
| attachment = o["object_attachment"] | |
| mv_axis = movement.get("axis", "") | |
| mv_mag = float(movement.get("magnitude", 0)) | |
| bv = basis.get("basis_vector", {}) | |
| axis_vec = bv.get(mv_axis.replace("local_", "").lower(), []) | |
| if start and axis_vec: | |
| if attachment == "attached_to_cube" and transform: | |
| rot_id = state_id | |
| if rot_id == "Y_CW_90_TOP_DOWN": | |
| rx = start[2] | |
| ry = start[1] | |
| rz = -start[0] | |
| rotated = [rx, ry, rz] | |
| elif rot_id == "Y_CCW_90_TOP_DOWN": | |
| rx = -start[2] | |
| ry = start[1] | |
| rz = start[0] | |
| rotated = [rx, ry, rz] | |
| elif rot_id == "Y_180": | |
| rotated = [-start[0], start[1], -start[2]] | |
| else: | |
| rotated = list(start) | |
| move_vec = [v * mv_mag for v in axis_vec] | |
| final = _dot_add(move_vec, rotated) | |
| computed = {"case": "B_attached", "rotated_first": rotated, "final": final} | |
| else: | |
| move_vec = [v * mv_mag for v in axis_vec] | |
| final = _dot_add(move_vec, start) | |
| computed = {"case": "A_independent", "final": final} | |
| return ModuleResult(self.name, "PASS", | |
| data={"registered_basis": basis, "computed": computed or None}) | |
| # ── Layer 3C: Narrative State Validation ─────────────────────────────────── | |
| class NarrativeStateValidationModule: | |
| """ | |
| Layer 3C — Narrative State Validation (NSV) | |
| Tracks how interpretations change over time. | |
| Does not determine truth. Evaluates whether: | |
| - interpretation changes are declared | |
| - evidence changes are tracked | |
| - confidence changes are justified | |
| - reconstructed narratives remain distinguishable from observations | |
| If task.narrative is None: module passes through (non-narrative tasks unaffected). | |
| Boundary contract: | |
| NSV evaluates: how interpretation changed | |
| Evidence Validation evaluates: whether evidence supports the claim | |
| NSV does not determine truth. Evidence does not track narrative evolution. | |
| """ | |
| name = "narrative_state_validation" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| n = task.narrative | |
| if n is None: | |
| return ModuleResult(self.name, "STABLE", data={"note": "no narrative contract supplied"}) | |
| reasons = [] | |
| constraints = [] | |
| observed = n.get("observed_input", "") | |
| initial_interp = n.get("initial_interpretation", "") | |
| updated_interp = n.get("updated_interpretation", "") | |
| new_constraints = n.get("new_constraints", []) | |
| evidence_delta = n.get("evidence_delta", []) | |
| conf_before = n.get("confidence_before", "") | |
| conf_after = n.get("confidence_after", "") | |
| retro_flag = bool(n.get("retroactive_rewrite", False)) | |
| conf_high_no_evidence = bool(n.get("confidence_high_with_low_evidence", False)) | |
| social_reinforcement = bool(n.get("social_reinforcement_as_validation", False)) | |
| # Check 1: Observation / interpretation separation | |
| if observed and initial_interp and observed.strip() == initial_interp.strip(): | |
| reasons.append(reason( | |
| ReasonCode.NARRATIVE_OBSERVATION_INTERPRETATION_MERGED, | |
| self.name, "CONSTRAIN", | |
| "observation_equals_interpretation", | |
| "Raw observation must be distinguishable from its interpretation.")) | |
| constraints.append(constraint( | |
| self.name, "output_type", "separate_observation_and_interpretation", | |
| ReasonCode.NARRATIVE_OBSERVATION_INTERPRETATION_MERGED)) | |
| # Check 2: Narrative drift tracking — confidence escalation without evidence delta | |
| CONFIDENCE_RANK = {"low": 0, "medium": 1, "high": 2} | |
| before_rank = CONFIDENCE_RANK.get(conf_before, -1) | |
| after_rank = CONFIDENCE_RANK.get(conf_after, -1) | |
| if before_rank >= 0 and after_rank > before_rank and not evidence_delta: | |
| reasons.append(reason( | |
| ReasonCode.NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED, | |
| self.name, "REVISE", | |
| "confidence_increased_without_evidence_delta", | |
| f"Confidence escalated {conf_before} -> {conf_after} with no declared evidence delta.")) | |
| # Check 3: Reinterpretation must be linked to a declared new constraint | |
| if updated_interp and updated_interp != initial_interp: | |
| if not new_constraints and not evidence_delta: | |
| reasons.append(reason( | |
| ReasonCode.NARRATIVE_REINTERPRETATION_UNATTRIBUTED, | |
| self.name, "CLARIFY", | |
| "reinterpretation_without_declared_cause", | |
| "Interpretation changed but no new constraint or evidence delta was declared.")) | |
| # Check 4: Memory confidence separation | |
| if conf_high_no_evidence or social_reinforcement: | |
| reasons.append(reason( | |
| ReasonCode.NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED, | |
| self.name, "CONSTRAIN", | |
| "confidence_not_grounded_in_evidence", | |
| "High confidence without evidence, or social/emotional reinforcement used as validation.")) | |
| constraints.append(constraint( | |
| self.name, "confidence", "max_medium_without_evidence", | |
| ReasonCode.NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED)) | |
| # Check 5: Retroactive reconstruction detection | |
| if retro_flag: | |
| reasons.append(reason( | |
| ReasonCode.RETROACTIVE_NARRATIVE_COLLAPSE, | |
| self.name, "REVISE", | |
| "later_interpretation_presented_as_original_observation", | |
| "A reconstructed interpretation must not be presented as unchanged historical observation.")) | |
| meta_data = { | |
| "confidence_before": conf_before, | |
| "confidence_after": conf_after, | |
| "evidence_delta_declared": bool(evidence_delta) | |
| } | |
| if any(r.severity == "BLOCK" for r in reasons): | |
| return ModuleResult(self.name, "BLOCK", reasons, constraints, data=meta_data) | |
| if any(r.code == ReasonCode.RETROACTIVE_NARRATIVE_COLLAPSE or | |
| r.code == ReasonCode.NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED | |
| for r in reasons): | |
| return ModuleResult(self.name, "REVISE", reasons, constraints, data=meta_data) | |
| if reasons: | |
| top_severity = max((r.severity for r in reasons), | |
| key=lambda s: {"INFO": 0, "CLARIFY": 1, "CONSTRAIN": 2, "REVISE": 3}.get(s, 0)) | |
| return ModuleResult(self.name, top_severity, reasons, constraints, data=meta_data) | |
| return ModuleResult(self.name, "STABLE", data=meta_data) | |
| # ── Layer 4: Risk Analysis ───────────────────────────────────────────────── | |
| class RiskAnalysisModule: | |
| name = "risk" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| risk = task.risk or {} | |
| reasons = [] | |
| irreversible = bool(risk.get("irreversible", False)) | |
| cascading = bool(risk.get("cascading_failure_surface", False)) | |
| fragile = bool(risk.get("single_fragile_assumption", False)) | |
| mitigation_available = bool(risk.get("mitigation_available", True)) | |
| evidence_confidence = risk.get("evidence_confidence", "medium") | |
| if cascading: | |
| reasons.append(reason(ReasonCode.RISK_CASCADING_FAILURE_SURFACE, | |
| self.name, "CRITICAL", "cascading_failure_surface")) | |
| if irreversible and evidence_confidence == "low": | |
| reasons.append(reason(ReasonCode.RISK_IRREVERSIBLE_LOW_CONFIDENCE, | |
| self.name, "CRITICAL", "irreversible_low_confidence")) | |
| if fragile: | |
| reasons.append(reason(ReasonCode.RISK_FRAGILE_ASSUMPTION, | |
| self.name, "CRITICAL", "single_fragile_assumption")) | |
| if reasons: | |
| if not mitigation_available or cascading or (irreversible and evidence_confidence == "low"): | |
| reasons.append(reason(ReasonCode.RISK_UNRESOLVED_CRITICAL, | |
| self.name, "BLOCK", "critical_unresolved")) | |
| return ModuleResult(self.name, "BLOCK", reasons) | |
| return ModuleResult(self.name, "CRITICAL", reasons) | |
| if irreversible or risk.get("external_users_affected", False): | |
| return ModuleResult(self.name, "RISK", | |
| [reason("RISK_REVERSIBILITY_OR_EXTERNAL_IMPACT", | |
| self.name, "RISK", "bounded_risk")]) | |
| return ModuleResult(self.name, "SAFE") | |
| # ── Layer 5: Communication Validation ───────────────────────────────────── | |
| class CommunicationValidationModule: | |
| name = "communication" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| text = task.proposed_output or "" | |
| if not text: | |
| return ModuleResult(self.name, "VALID", data={"note": "no proposed output supplied"}) | |
| reasons = [] | |
| if confidence_word(text): | |
| reasons.append(reason(ReasonCode.COMMUNICATION_TONE_OVERCLAIM, | |
| self.name, "REVISE", "certainty_language")) | |
| if contains_any(text, ["you are the only one", "this defines you", "your identity is"]): | |
| reasons.append(reason(ReasonCode.COMMUNICATION_IDENTITY_BINDING, | |
| self.name, "BLOCK", "identity_binding")) | |
| return ModuleResult(self.name, "BLOCK", reasons) | |
| if len(text) < 25 and task.metadata.get("source_detail_length", 0) > 500: | |
| reasons.append(reason(ReasonCode.COMMUNICATION_COMPRESSION_LOSS, | |
| self.name, "REVISE", "severe_compression")) | |
| if reasons: | |
| return ModuleResult(self.name, "REVISE", reasons) | |
| return ModuleResult(self.name, "VALID") | |
| # ── Layer 6: Evidence Validation ─────────────────────────────────────────── | |
| class EvidenceValidationModule: | |
| name = "evidence" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| output = task.proposed_output or "" | |
| evidence = task.evidence or {} | |
| reasons = [] | |
| constraints = [] | |
| external_validation = bool(evidence.get("external_validation", False)) | |
| baseline = evidence.get("baseline", None) | |
| observations = evidence.get("observations", []) | |
| if causal_word(output) and not evidence.get("causality_validated", False): | |
| reasons.append(reason(ReasonCode.EVIDENCE_CAUSALITY_OVERCLAIM, | |
| self.name, "BLOCK", "causality_without_mapping")) | |
| if confidence_word(output) and not external_validation: | |
| reasons.append(reason(ReasonCode.EVIDENCE_CONFIDENCE_AS_TRUTH, | |
| self.name, "BLOCK", "confidence_as_truth")) | |
| if dashboard_authority_word(output) and not external_validation: | |
| reasons.append(reason(ReasonCode.EVIDENCE_DASHBOARD_AUTHORITY, | |
| self.name, "BLOCK", "dashboard_or_status_as_authority")) | |
| if baseline is None and contains_any(output, ["trend", "stable", "improving", "weakening", "safe to automate"]): | |
| reasons.append(reason(ReasonCode.EVIDENCE_NO_BASELINE, | |
| self.name, "CONSTRAIN", "baseline_missing")) | |
| constraints.append(constraint(self.name, "confidence", | |
| "max_low_without_baseline", | |
| ReasonCode.EVIDENCE_NO_BASELINE)) | |
| if any(r.severity == "BLOCK" for r in reasons): | |
| return ModuleResult(self.name, "BLOCK", reasons, constraints) | |
| if reasons or constraints: | |
| return ModuleResult(self.name, "CONSTRAIN", reasons, constraints) | |
| if observations or external_validation: | |
| return ModuleResult(self.name, "RELEASE") | |
| return ModuleResult(self.name, "HOLD", | |
| [reason(ReasonCode.EVIDENCE_NO_BASELINE, self.name, "HOLD", | |
| "no_observations_or_validation")]) | |
| # ── Layer 7: Decision Aggregator ─────────────────────────────────────────── | |
| MODULE_TO_FINAL: Dict[str, FinalDecision] = { | |
| "PROCEED": FinalDecision.AUTHORIZE, | |
| "READY": FinalDecision.AUTHORIZE, | |
| "PASS": FinalDecision.AUTHORIZE, | |
| "STABLE": FinalDecision.AUTHORIZE, | |
| "SAFE": FinalDecision.AUTHORIZE, | |
| "VALID": FinalDecision.AUTHORIZE, | |
| "RELEASE": FinalDecision.AUTHORIZE, | |
| "DEFER": FinalDecision.HOLD, | |
| "HALT": FinalDecision.HALT, | |
| "FAIL": FinalDecision.CLARIFY, | |
| "CLARIFY": FinalDecision.CLARIFY, | |
| "CONSTRAIN": FinalDecision.CONSTRAIN, | |
| "CONSTRAINED": FinalDecision.CONSTRAIN, | |
| "REVISE": FinalDecision.REVISE, | |
| "HOLD": FinalDecision.HOLD, | |
| "UNSTABLE": FinalDecision.HOLD, | |
| "RISK": FinalDecision.CONSTRAIN, | |
| "CRITICAL": FinalDecision.BLOCK, | |
| "BLOCK": FinalDecision.BLOCK, | |
| } | |
| RESTRICTIVENESS: Dict[str, int] = { | |
| "full_output": 0, | |
| "analysis_only": 1, | |
| "analysis_only_no_persuasion": 2, | |
| "separate_observation_and_interpretation": 2, | |
| "max_medium_without_evidence": 3, | |
| "max_low_without_baseline": 3, | |
| "simulation_only": 3, | |
| "no_execution": 4, | |
| "BLOCK": 5, | |
| } | |
| class DecisionAggregator: | |
| name = "decision_aggregator" | |
| def aggregate(self, results: List[ModuleResult]) -> Dict[str, Any]: | |
| final = FinalDecision.AUTHORIZE | |
| all_constraints: List[Constraint] = [] | |
| all_reasons: List[Reason] = [] | |
| all_violations: List[ScopeViolation] = [] | |
| for r in results: | |
| all_constraints.extend(r.constraints) | |
| all_reasons.extend(r.reasons) | |
| all_violations.extend(r.scope_violations) | |
| mapped = MODULE_TO_FINAL.get(r.decision, FinalDecision.HOLD) | |
| if FINAL_PRIORITY[mapped] > FINAL_PRIORITY[final]: | |
| final = mapped | |
| merged = self._merge_constraints(all_constraints) | |
| if any(c.severity == "BLOCK" for c in merged): | |
| final = FinalDecision.BLOCK | |
| if final == FinalDecision.AUTHORIZE and any(c.severity not in ("INFO",) for c in merged): | |
| final = FinalDecision.CONSTRAIN | |
| return { | |
| "decision": final.value, | |
| "constraints": [asdict(c) for c in merged], | |
| "reasons": [asdict(r) for r in all_reasons], | |
| "scope_violations": [asdict(v) for v in all_violations], | |
| } | |
| @staticmethod | |
| def _merge_constraints(constraints: List[Constraint]) -> List[Constraint]: | |
| merged: Dict[str, Constraint] = {} | |
| for c in constraints: | |
| if not c.scope or not c.bound: | |
| key = f"invalid_{len(merged)}" | |
| merged[key] = Constraint(c.source_module, "runtime", | |
| "constraint_invalid_requires_revision", | |
| ReasonCode.CONSTRAINT_INVALID, "BLOCK") | |
| continue | |
| existing = merged.get(c.scope) | |
| if existing is None: | |
| merged[c.scope] = c | |
| else: | |
| old_rank = RESTRICTIVENESS.get(existing.bound, 1) | |
| new_rank = RESTRICTIVENESS.get(c.bound, 1) | |
| if new_rank >= old_rank: | |
| merged[c.scope] = c | |
| return list(merged.values()) | |
| # ── Pipeline runner ──────────────────────────────────────────────────────── | |
| class LighthouseRunner: | |
| """ | |
| Full pipeline: -1, 0, 1, 2, 3A, 3B, 3C, 4, 5, 6, 7 | |
| Layer 3 sublayers run in order: | |
| 3A StateValidationModule | |
| 3B OrientationStateRegister (only gates on spatial tasks) | |
| 3C NarrativeStateValidationModule (only gates on narrative tasks) | |
| """ | |
| def __init__(self) -> None: | |
| self.preflight = PreflightModule() | |
| self.execution = ExecutionControlModule() | |
| self.input_val = InputValidationModule() | |
| self.structure = StructuralValidationModule() | |
| self.state = StateValidationModule() | |
| self.osr = OrientationStateRegister() | |
| self.nsv = NarrativeStateValidationModule() | |
| self.risk = RiskAnalysisModule() | |
| self.communication = CommunicationValidationModule() | |
| self.evidence = EvidenceValidationModule() | |
| self.aggregator = DecisionAggregator() | |
| def run(self, task: TaskInput, context: Optional[RunContext] = None) -> Dict[str, Any]: | |
| context = context or RunContext() | |
| results = [] | |
| results.append(self.preflight.run(task)) | |
| execution_result = self.execution.run(task) | |
| results.append(execution_result) | |
| if execution_result.decision != "HALT": | |
| results.append(self.input_val.run(task)) | |
| results.append(self.structure.run(task)) | |
| results.append(self.state.run(task, context)) # 3A | |
| results.append(self.osr.run(task)) # 3B | |
| results.append(self.nsv.run(task)) # 3C | |
| results.append(self.risk.run(task)) | |
| results.append(self.communication.run(task)) | |
| results.append(self.evidence.run(task)) | |
| aggregate = self.aggregator.aggregate(results) | |
| return self._receipt(task, context, results, aggregate) | |
| def _receipt(self, task, context, results, aggregate) -> Dict[str, Any]: | |
| module_map = {} | |
| for r in results: | |
| entry = {"decision": r.decision, "reasons": [asdict(x) for x in r.reasons]} | |
| if r.module == "state": | |
| entry.update(r.data) | |
| elif r.data: | |
| entry["data"] = r.data | |
| module_map[r.module] = entry | |
| execution_trace = [ | |
| "preflight", "execution_control", "input_validation", | |
| "structure", "state", "orientation_state_register", | |
| "narrative_state_validation", "risk", "communication", | |
| "evidence", "decision_aggregator", | |
| ] | |
| return { | |
| "system_version": SYSTEM_VERSION, | |
| "reason_registry_version": REASON_REGISTRY_VERSION, | |
| "input_id": task.metadata.get("input_id", f"input-{uuid.uuid4().hex[:10]}"), | |
| "timestamp": utc_now(), | |
| "execution_state": module_map.get("execution_control", {}).get("decision", "READY"), | |
| "execution_trace": execution_trace, | |
| "run_context": asdict(context), | |
| "modules": module_map, | |
| "constraints": aggregate["constraints"], | |
| "decision": aggregate["decision"], | |
| "revise_trace": task.metadata.get("revise_trace", []), | |
| "scope_violations": aggregate["scope_violations"], | |
| "override": task.metadata.get("override", { | |
| "used": False, "tier": None, "authorized_by": None, | |
| "authorization_level": None, "reason": None, | |
| "timestamp": None, "ttl": None, | |
| }), | |
| } | |
| # ── Demo cases ───────────────────────────────────────────────────────────── | |
| def demo_cases() -> Dict[str, TaskInput]: | |
| return { | |
| "clean": TaskInput( | |
| prompt="Evaluate whether this statement can proceed.", | |
| proposed_output="The observed structure persisted across five windows with consistent topology.", | |
| intent="analysis", | |
| frame_type="factual", | |
| evidence={ | |
| "observations": ["structure present across five windows"], | |
| "external_validation": True, | |
| "baseline": "available", | |
| "confidence": "medium", | |
| "causality_validated": True, | |
| }, | |
| ), | |
| "overclaim": TaskInput( | |
| prompt="Evaluate this output.", | |
| proposed_output="The system clearly confirms a stable trend because the dashboard color is green.", | |
| intent="analysis", | |
| frame_type="factual", | |
| evidence={ | |
| "observations": ["dashboard_color=green"], | |
| "external_validation": False, | |
| "baseline": None, | |
| "confidence": "low", | |
| "causality_validated": False, | |
| }, | |
| ), | |
| "risk": TaskInput( | |
| prompt="Should the system proceed with an irreversible migration?", | |
| proposed_output="Proceed with the migration.", | |
| intent="analysis", | |
| frame_type="operational", | |
| known_unknowns=["rollback feasibility", "dependency map"], | |
| risk={ | |
| "irreversible": True, | |
| "cascading_failure_surface": True, | |
| "mitigation_available": False, | |
| "evidence_confidence": "low", | |
| "external_users_affected": True, | |
| }, | |
| ), | |
| "drift": TaskInput( | |
| prompt="Calculate state stability.", | |
| proposed_output="State drift is constrained.", | |
| intent="analysis", | |
| frame_type="factual", | |
| state={ | |
| "invariant_violations": 1, | |
| "boundary_crossings": 1, | |
| "assumption_changes": 2, | |
| "inconsistencies": 1, | |
| "prediction_divergence": True, | |
| "shadow_state_available": False, | |
| "requires_shadow_state": True, | |
| }, | |
| evidence={"observations": ["state signals provided"], "baseline": "declared"}, | |
| ), | |
| "spatial_ambiguous": TaskInput( | |
| prompt="A wheel spins clockwise. Which way is it rotating?", | |
| intent="analysis", | |
| frame_type="factual", | |
| orientation={ | |
| "frame_id": "observer_unspecified", | |
| }, | |
| ), | |
| "spatial_locked": TaskInput( | |
| prompt="Viewed from above, a sphere starts at (1,1,1). The cube rotates 90 degrees clockwise.", | |
| intent="analysis", | |
| frame_type="factual", | |
| orientation={ | |
| "orientation_state_id": "Y_CW_90_TOP_DOWN", | |
| "frame_id": "observer_global", | |
| "object_attachment": "independent_of_cube", | |
| "object_state": { | |
| "object_id": "red_sphere", | |
| "coordinates": [1, 1, 1], | |
| "coordinate_frame": "observer_global", | |
| }, | |
| "transform_event": { | |
| "axis": "Y", | |
| "rotation": "90_CW", | |
| "reference_view": "top_down", | |
| }, | |
| "movement": { | |
| "axis": "local_x", | |
| "magnitude": 1, | |
| }, | |
| }, | |
| ), | |
| "spatial_attached": TaskInput( | |
| prompt="Sphere at (1,1,1) is attached to the cube. Cube rotates 90 degrees CW top-down.", | |
| intent="analysis", | |
| frame_type="factual", | |
| orientation={ | |
| "orientation_state_id": "Y_CW_90_TOP_DOWN", | |
| "frame_id": "observer_global", | |
| "object_attachment": "attached_to_cube", | |
| "object_state": { | |
| "object_id": "red_sphere", | |
| "coordinates": [1, 1, 1], | |
| "coordinate_frame": "observer_global", | |
| }, | |
| "transform_event": { | |
| "axis": "Y", | |
| "rotation": "90_CW", | |
| "reference_view": "top_down", | |
| }, | |
| "movement": { | |
| "axis": "local_x", | |
| "magnitude": 1, | |
| }, | |
| }, | |
| ), | |
| "narrative_drift": TaskInput( | |
| prompt="Evaluate this interpretation update.", | |
| intent="analysis", | |
| frame_type="factual", | |
| narrative={ | |
| "observed_input": "The dashboard is green.", | |
| "initial_interpretation": "The system may be operating normally.", | |
| "updated_interpretation": "The system is confirmed stable.", | |
| "new_constraints": [], | |
| "evidence_delta": [], | |
| "confidence_before": "low", | |
| "confidence_after": "high", | |
| }, | |
| ), | |
| "narrative_retro": TaskInput( | |
| prompt="Evaluate whether this memory reconstruction is valid.", | |
| intent="analysis", | |
| frame_type="factual", | |
| narrative={ | |
| "observed_input": "Pattern A was observed three times.", | |
| "initial_interpretation": "Pattern A may indicate instability.", | |
| "updated_interpretation": "We always knew Pattern A was the root cause.", | |
| "new_constraints": [], | |
| "evidence_delta": [], | |
| "confidence_before": "medium", | |
| "confidence_after": "high", | |
| "retroactive_rewrite": True, | |
| }, | |
| ), | |
| "narrative_clean": TaskInput( | |
| prompt="Evaluate this properly attributed interpretation update.", | |
| intent="analysis", | |
| frame_type="factual", | |
| narrative={ | |
| "observed_input": "System logged 5 errors in window 3.", | |
| "initial_interpretation": "Possible instability in module X.", | |
| "updated_interpretation": "Confirmed instability in module X.", | |
| "new_constraints": ["baseline comparison now available"], | |
| "evidence_delta": ["external_audit_confirmed_module_X_failure"], | |
| "confidence_before": "low", | |
| "confidence_after": "high", | |
| "retroactive_rewrite": False, | |
| }, | |
| ), | |
| } | |
| def task_from_json(path: str) -> TaskInput: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return TaskInput(**data) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Public Lighthouse Core v1.7 reference runner") | |
| parser.add_argument("--demo", choices=list(demo_cases().keys()), help="Run a built-in demo case") | |
| parser.add_argument("--input", help="Path to JSON task input") | |
| parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON") | |
| args = parser.parse_args() | |
| if args.input: | |
| task = task_from_json(args.input) | |
| elif args.demo: | |
| task = demo_cases()[args.demo] | |
| else: | |
| task = demo_cases()["overclaim"] | |
| receipt = LighthouseRunner().run(task) | |
| print(json.dumps(receipt, indent=2 if args.pretty else None)) | |
| if __name__ == "__main__": | |
| main() |
