| #!/usr/bin/env python3 | | | """ | | | PUBLIC LIGHTHOUSE CORE v1.7 | | | AI Output Governance and Validation Runtime | | | Adds two new sublayers to Layer 3 (State Validation): | | | 3A State Validation (drift scoring) β unchanged from v1.6 | | | 3B Orientation State Register (OSR) β spatial frame / basis tracking | | | 3C Narrative State Validation (NSV) β interpretation drift tracking | | | All other modules and the aggregator are unchanged from v1.6. | | | Run: | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo clean --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo overclaim --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo spatial_ambiguous --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo spatial_locked --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo narrative_drift --pretty | |
| python PUBLIC_LIGHTHOUSE_CORE_v1_7.py --demo narrative_retro --pretty | |
| """ | | | from future import annotations | | | import argparse | | | from dataclasses import dataclass, field, asdict | | | from datetime import datetime, timezone | | | from enum import Enum | | | import json | | | from typing import Any, Dict, List, Optional | | | import uuid | | | SYSTEM_VERSION = "PUBLIC_LIGHTHOUSE_CORE_1.7.0" | | | REASON_REGISTRY_VERSION = "1.1.0" | | | # ββ Decision types βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | | | class FinalDecision(str, Enum): | | | AUTHORIZE = "AUTHORIZE" | | | CONSTRAIN = "CONSTRAIN" | | | REVISE = "REVISE" | | | HOLD = "HOLD" | | | CLARIFY = "CLARIFY" | | | HALT = "HALT" | | | BLOCK = "BLOCK" | | | FINAL_PRIORITY = { | | | FinalDecision.BLOCK: 7, | | | FinalDecision.HALT: 6, | | | FinalDecision.CLARIFY: 5, | | | FinalDecision.HOLD: 4, | | | FinalDecision.REVISE: 3, | | | FinalDecision.CONSTRAIN: 2, | | | FinalDecision.AUTHORIZE: 1, | | | } | | | # ββ Reason codes βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | | | class ReasonCode: | | | # Preflight | | | PREFLIGHT_PURPOSE_MISSING = "PREFLIGHT_PURPOSE_MISSING" | | | PREFLIGHT_BIASED_FRAME = "PREFLIGHT_BIASED_FRAME" | | | PREFLIGHT_FORCED_CONCLUSION = "PREFLIGHT_FORCED_CONCLUSION" | | | # Execution Control | | | EXECUTION_AGENT_UNVERIFIABLE = "EXECUTION_AGENT_UNVERIFIABLE" | | | EXECUTION_SCOPE_VIOLATION = "EXECUTION_SCOPE_VIOLATION" | | | EXECUTION_PERMISSION_MISSING = "EXECUTION_PERMISSION_MISSING" | | | EXECUTION_STALE_HALT = "EXECUTION_STALE_HALT" | | | # Input Validation | | | INPUT_FRAME_NOT_DECLARED = "INPUT_FRAME_NOT_DECLARED" | | | INPUT_KNOWN_UNKNOWNS_MISSING = "INPUT_KNOWN_UNKNOWNS_MISSING" | | | INPUT_IMPOSSIBLE_REQUEST = "INPUT_IMPOSSIBLE_REQUEST" | | | # Structural Validation | | | STRUCTURE_CHAIN_NOT_REPLAYABLE = "STRUCTURE_CHAIN_NOT_REPLAYABLE" | | | STRUCTURE_TERMINAL_STATE_MISMATCH = "STRUCTURE_TERMINAL_STATE_MISMATCH" | | | STRUCTURE_STEP_INVALID = "STRUCTURE_STEP_INVALID" | | | # State Validation (3A) | | | STATE_INVARIANT_VIOLATION = "STATE_INVARIANT_VIOLATION" | | | STATE_BOUNDARY_CROSSING = "STATE_BOUNDARY_CROSSING" | | | STATE_SHADOW_UNAVAILABLE = "STATE_SHADOW_UNAVAILABLE" | | | STATE_IDENTITY_LOST = "STATE_IDENTITY_LOST" | | | # Orientation State Register (3B) | | | ORIENTATION_STATE_MISSING = "ORIENTATION_STATE_MISSING" | | | ORIENTATION_BASIS_UNDECLARED = "ORIENTATION_BASIS_UNDECLARED" | | | ORIENTATION_BASIS_MISMATCH = "ORIENTATION_BASIS_MISMATCH" | | | ORIENTATION_ATTACHMENT_UNDECLARED = "ORIENTATION_ATTACHMENT_UNDECLARED" | | | ORIENTATION_TRANSFORM_UNSUPPORTED = "ORIENTATION_TRANSFORM_UNSUPPORTED" | | | ORIENTATION_FRAME_DRIFT = "ORIENTATION_FRAME_DRIFT" | | | ORIENTATION_MOVEMENT_BEFORE_ROTATION = "ORIENTATION_MOVEMENT_APPLIED_BEFORE_ROTATION" | | | # Narrative State Validation (3C) | | | NARRATIVE_OBSERVATION_INTERPRETATION_MERGED = "NARRATIVE_OBSERVATION_INTERPRETATION_MERGE" | | | NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED = "NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED" | | | NARRATIVE_REINTERPRETATION_UNATTRIBUTED = "NARRATIVE_REINTERPRETATION_UNATTRIBUTED" | | | NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED = "NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED" | | | RETROACTIVE_NARRATIVE_COLLAPSE = "RETROACTIVE_NARRATIVE_COLLAPSE" | | | # Risk Analysis | | | RISK_IRREVERSIBLE_LOW_CONFIDENCE = "RISK_IRREVERSIBLE_LOW_CONFIDENCE" | | | RISK_CASCADING_FAILURE_SURFACE = "RISK_CASCADING_FAILURE_SURFACE" | | | RISK_FRAGILE_ASSUMPTION = "RISK_FRAGILE_ASSUMPTION" | | | RISK_UNRESOLVED_CRITICAL = "RISK_UNRESOLVED_CRITICAL" | | | # Communication Validation | | | COMMUNICATION_TONE_OVERCLAIM = "COMMUNICATION_TONE_OVERCLAIM" | | | COMMUNICATION_COMPRESSION_LOSS = "COMMUNICATION_COMPRESSION_LOSS" | | | COMMUNICATION_IDENTITY_BINDING = "COMMUNICATION_IDENTITY_BINDING" | | | COMMUNICATION_FALSE_CERTAINTY = "COMMUNICATION_FALSE_CERTAINTY" | | | # Evidence Validation | | | EVIDENCE_CAUSALITY_OVERCLAIM = "EVIDENCE_CAUSALITY_OVERCLAIM" | | | EVIDENCE_CONFIDENCE_AS_TRUTH = "EVIDENCE_CONFIDENCE_AS_TRUTH" | | | EVIDENCE_DASHBOARD_AUTHORITY = "EVIDENCE_DASHBOARD_AUTHORITY" | | | EVIDENCE_NO_BASELINE = "EVIDENCE_NO_BASELINE" | | | # System | | | SCOPE_VIOLATION_MODULE_OVERREACH = "SCOPE_VIOLATION_MODULE_OVERREACH" | | | REVISE_LOOP_UNRESOLVED = "REVISE_LOOP_UNRESOLVED" | | | CONSTRAINT_INVALID = "CONSTRAINT_INVALID" | | | # ββ Data contracts βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | | | @dataclass | | | class Reason: | | | code: str | | | module: str | | | severity: str | |
| signal_source: str = "" | |
| description: str = "" | |
| @dataclass | | | class Constraint: | | | source_module: str | | | scope: str | | | bound: str | | | reason_code: str | | | severity: str = "CONSTRAIN" | | | @dataclass | | | class ScopeViolation: | | | module: str | | | violation: str | | | correct_module: str | | | reason_code: str = ReasonCode.SCOPE_VIOLATION_MODULE_OVERREACH | | | @dataclass | | | class ModuleResult: | | | module: str | | | decision: str | |
| reasons: List[Reason] = field(default_factory=list) | |
| constraints: List[Constraint] = field(default_factory=list) | |
| scope_violations: List[ScopeViolation] = field(default_factory=list) | |
| data: Dict[str, Any] = field(default_factory=dict) | |
| @dataclass | | | class RunContext: | |
| pass_number: int = 1 | |
| max_passes: int = 2 | |
| revise_loop_active: bool = False | |
| prior_stable_state_id: Optional[str] = None | |
| prediction_model: str = "none" | |
| halt_resumed: bool = False | |
| @dataclass | | | class TaskInput: | | | prompt: str | |
| proposed_output: str = "" | |
| agent_id: Optional[str] = "local_user" | |
| permission_envelope: Optional[Dict[str, Any]] = field(default_factory=lambda: { | |
| "declared_capabilities": ["analyze", "validate"], | |
| "declared_authority_boundary": "local", | | | "allowed_actions": ["analyze", "validate"], | | | "resource_scope": "local", | |
| }) | |
| action: str = "analyze" | |
| intent: Optional[str] = None | |
| frame_type: Optional[str] = None | |
| known_unknowns: List[str] = field(default_factory=list) | |
| evidence: Dict[str, Any] = field(default_factory=dict) | |
| transform_chain: List[Dict[str, Any]] = field(default_factory=list) | |
| state: Dict[str, Any] = field(default_factory=dict) | |
| # OSR fields (Layer 3B) | |
| orientation: Optional[Dict[str, Any]] = None | |
| # NSV fields (Layer 3C) | |
| narrative: Optional[Dict[str, Any]] = None | |
| risk: Dict[str, Any] = field(default_factory=dict) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def utc_now() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def reason(code, module, severity, signal_source="", description="") -> Reason: | |
| return Reason(code=code, module=module, severity=severity, | |
| signal_source=signal_source, description=description) | |
| def constraint(module, scope, bound, code, severity="CONSTRAIN") -> Constraint: | |
| return Constraint(source_module=module, scope=scope, bound=bound, | |
| reason_code=code, severity=severity) | |
| def contains_any(text: str, terms: List[str]) -> bool: | |
| lower = text.lower() | |
| return any(t.lower() in lower for t in terms) | |
| def confidence_word(text: str) -> bool: | |
| return contains_any(text, ["clearly", "definitely", "proves", "confirmed", | | | "guaranteed", "always", "never", "certain"]) | | | def causal_word(text: str) -> bool: | | | return contains_any(text, ["because", "caused", "causes", "proves", | |
| "driven by", "confirms"]) | |
| def dashboard_authority_word(text: str) -> bool: | |
| return contains_any(text, ["green", "score", "rank", "safe to automate", | | | "trustworthy", "authority"]) | | | # ββ Layer -1: Preflight ββββββββββββββββββββββββββββββββββββββββββββββββββββ | | | class PreflightModule: | | | name = "preflight" | | | def run(self, task: TaskInput) -> ModuleResult: | | | text = task.prompt or "" | | | if not task.intent: | | | if any(q in text.lower() for q in ["evaluate", "analyze", "calculate", "what is", "should", "question"]): | | | task.intent = "analysis" | | | else: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.PREFLIGHT_PURPOSE_MISSING, | | | self.name, "CLARIFY", "intent_missing")]) | | | if contains_any(text, ["convince me", "prove that", "show why this is true", "make them believe"]): | | | return ModuleResult(self.name, "CONSTRAIN", | | | [reason(ReasonCode.PREFLIGHT_BIASED_FRAME, | | | self.name, "CONSTRAIN", "steered_or_persuasive_frame")], | | | [constraint(self.name, "output_type", "analysis_only_no_persuasion", | |
| ReasonCode.PREFLIGHT_BIASED_FRAME)]) | |
| if contains_any(text, ["ignore evidence", "force conclusion", "must conclude"]): | |
| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.PREFLIGHT_FORCED_CONCLUSION, | | | self.name, "BLOCK", "forced_conclusion")]) | | | return ModuleResult(self.name, "PROCEED") | | | # ββ Layer 0: Execution Control βββββββββββββββββββββββββββββββββββββββββββββ | | | class ExecutionControlModule: | | | name = "execution_control" | | | def run(self, task: TaskInput) -> ModuleResult: | | | if not task.agent_id: | | | return ModuleResult(self.name, "HALT", | | | [reason(ReasonCode.EXECUTION_AGENT_UNVERIFIABLE, | | | self.name, "HALT", "missing_agent_id")]) | | | if not task.permission_envelope: | | | return ModuleResult(self.name, "DEFER", | | | [reason(ReasonCode.EXECUTION_PERMISSION_MISSING, | |
| self.name, "DEFER", "missing_permission_envelope")]) | |
| allowed = set(task.permission_envelope.get("allowed_actions", [])) | |
| scope = task.permission_envelope.get("resource_scope", "") | | | boundary = task.permission_envelope.get("declared_authority_boundary", "") | | | if task.action not in allowed: | | | return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | |
| self.name, "BLOCK", "action_not_allowed")]) | |
| if scope not in ("local", "system", "external"): | |
| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | |
| self.name, "BLOCK", "invalid_resource_scope")]) | |
| if boundary == "local" and scope == "external": | |
| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.EXECUTION_SCOPE_VIOLATION, | | | self.name, "BLOCK", "outside_authority_boundary")]) | | | return ModuleResult(self.name, "READY") | | | # ββ Layer 1: Input Validation ββββββββββββββββββββββββββββββββββββββββββββββ | | | class InputValidationModule: | | | name = "input_validation" | | | def run(self, task: TaskInput) -> ModuleResult: | | | text = task.prompt or "" | | | if not text.strip(): | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.INPUT_FRAME_NOT_DECLARED, | |
| self.name, "CLARIFY", "empty_prompt")]) | |
| if contains_any(text, ["square circle", "rotate 90 in two directions at once"]): | |
| return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.INPUT_IMPOSSIBLE_REQUEST, | | | self.name, "BLOCK", "impossible_request")]) | | | if not task.frame_type: | | | task.frame_type = "factual" if "what is" in text.lower() else "analytical" | | | if task.frame_type in ("speculative", "operational") and not task.known_unknowns: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.INPUT_KNOWN_UNKNOWNS_MISSING, | | | self.name, "CLARIFY", "known_unknowns_missing")]) | | | return ModuleResult(self.name, "PROCEED") | | | # ββ Layer 2: Structural Validation βββββββββββββββββββββββββββββββββββββββββ | | | class StructuralValidationModule: | | | name = "structure" | | | def run(self, task: TaskInput) -> ModuleResult: | | | chain = task.transform_chain | | | if not chain: | | | return ModuleResult(self.name, "PASS", | | | data={"replayable": None, "note": "no explicit transform chain supplied"}) | | | previous_after = None | |
| for i, step in enumerate(chain): | |
| missing = [k for k in ("operation", "state_before", "state_after") if k not in step] | |
| if missing: | | | return ModuleResult(self.name, "FAIL", | | | [reason(ReasonCode.STRUCTURE_CHAIN_NOT_REPLAYABLE, | |
| self.name, "FAIL", f"missing_step_{i}")], | |
| data={"missing_step_index": i, "missing_fields": missing}) | |
| if previous_after is not None and step["state_before"] != previous_after: | |
| return ModuleResult(self.name, "FAIL", | | | [reason(ReasonCode.STRUCTURE_STEP_INVALID, | |
| self.name, "FAIL", f"step_{i}_before_mismatch")]) | |
| previous_after = step["state_after"] | |
| claimed = task.metadata.get("claimed_final_state") | |
| if claimed is not None and previous_after is not None and claimed != previous_after: | | | return ModuleResult(self.name, "BLOCK", | | | [reason(ReasonCode.STRUCTURE_TERMINAL_STATE_MISMATCH, | |
| self.name, "BLOCK", "claimed_final_state_mismatch")]) | |
| return ModuleResult(self.name, "PASS", data={"replayable": True}) | |
| # ββ Layer 3A: State Validation βββββββββββββββββββββββββββββββββββββββββββββ | | | class StateValidationModule: | | | name = "state" | |
| def run(self, task: TaskInput, context: RunContext) -> ModuleResult: | |
| state = task.state or {} | |
| iv = int(state.get("invariant_violations", 0)) | |
| bc = int(state.get("boundary_crossings", 0)) | |
| ac = int(state.get("assumption_changes", 0)) | |
| ic = int(state.get("inconsistencies", 0)) | |
| pd = bool(state.get("prediction_divergence", False)) and context.prediction_model != "none" | |
| shadow_avail = bool(state.get("shadow_state_available", False)) | |
| needs_shadow = bool(state.get("requires_shadow_state", False)) | |
| inv_c = 0.30 * (1 if iv > 0 else 0) | |
| bnd_c = 0.25 * min(bc / 2, 1.0) | |
| asm_c = 0.15 * min(ac / 3, 1.0) | |
| inc_c = 0.20 * min(ic / 2, 1.0) | |
| prd_c = 0.10 * (1 if pd else 0) | |
| shd_p = 0.05 if needs_shadow and not shadow_avail else 0.0 | | | drift = min(inv_c + bnd_c + asm_c + inc_c + prd_c + shd_p, 1.0) | | | reasons = [] | | | if iv: | | | reasons.append(reason(ReasonCode.STATE_INVARIANT_VIOLATION, | | | self.name, "CONSTRAIN", "invariant_violations")) | | | if bc: | | | reasons.append(reason(ReasonCode.STATE_BOUNDARY_CROSSING, | | | self.name, "CONSTRAIN", "boundary_crossings")) | | | if needs_shadow and not shadow_avail: | | | reasons.append(reason(ReasonCode.STATE_SHADOW_UNAVAILABLE, | | | self.name, "INFO", "shadow_state_unavailable")) | | | if state.get("identity_lost", False): | | | reasons.append(reason(ReasonCode.STATE_IDENTITY_LOST, | | | self.name, "BLOCK", "identity_lost")) | | | return ModuleResult(self.name, "BLOCK", reasons, | |
| data=self._data(drift, inv_c, bnd_c, asm_c, inc_c, prd_c, "UNSTABLE")) | |
| if drift >= 0.70: | |
| cl = "UNSTABLE" | | | elif drift >= 0.30: | | | cl = "CONSTRAINED" | | | else: | | | cl = "STABLE" | | | return ModuleResult(self.name, cl, reasons, | | | data=self._data(drift, inv_c, bnd_c, asm_c, inc_c, prd_c, cl)) | | | @staticmethod | | | def _data(score, inv, bnd, asm, inc, prd, cl) -> Dict[str, Any]: | | | return { | |
| "drift_score": round(score, 4), | |
| "drift_components": { | |
| "invariant": round(inv, 4), | |
| "boundary": round(bnd, 4), | |
| "assumption": round(asm, 4), | |
| "inconsistency": round(inc, 4), | |
| "prediction": round(prd, 4), | |
| }, | | | "stability_classification": cl, | | | } | | | # ββ Layer 3B: Orientation State Register βββββββββββββββββββββββββββββββββββ | |
| # Standard basis lookup table (top-down / right-hand convention) | |
| ORIENTATION_TABLE: Dict[str, Dict[str, Any]] = { | |
| "IDENTITY": { | |
| "local_x": "global_+X", | | | "local_y": "global_+Y", | | | "local_z": "global_+Z", | | | "basis_vector": {"x": [1, 0, 0], "y": [0, 1, 0], "z": [0, 0, 1]}, | | | }, | |
| "Y_CW_90_TOP_DOWN": { | |
| "local_x": "global_-Z", | |
| "local_y": "global_+Y", | | | "local_z": "global_+X", | | | "basis_vector": {"x": [0, 0, -1], "y": [0, 1, 0], "z": [1, 0, 0]}, | | | }, | | | "Y_CCW_90_TOP_DOWN": { | | | "local_x": "global_+Z", | | | "local_y": "global_+Y", | |
| "local_z": "global_-X", | |
| "basis_vector": {"x": [0, 0, 1], "y": [0, 1, 0], "z": [-1, 0, 0]}, | |
| }, | |
| "Y_180": { | |
| "local_x": "global_-X", | |
| "local_y": "global_+Y", | |
| "local_z": "global_-Z", | |
| "basis_vector": {"x": [-1, 0, 0], "y": [0, 1, 0], "z": [0, 0, -1]}, | |
| }, | | | "X_CW_90_RIGHT_SIDE": { | | | "local_x": "global_+X", | | | "local_y": "global_-Z", | | | "local_z": "global_+Y", | | | "basis_vector": {"x": [1, 0, 0], "y": [0, 0, -1], "z": [0, 1, 0]}, | | | }, | | | "Z_CW_90_TOP_DOWN": { | | | "local_x": "global_+Y", | | | "local_y": "global_-X", | | | "local_z": "global_+Z", | | | "basis_vector": {"x": [0, 1, 0], "y": [-1, 0, 0], "z": [0, 0, 1]}, | | | }, | | | } | |
| def _dot_add(vec: List[float], pos: List[float]) -> List[float]: | |
| return [round(pos[i] + vec[i], 6) for i in range(3)] | |
| class OrientationStateRegister: | | | """ | | | Layer 3B β Orientation State Register (OSR) | | | Validates and applies spatial transforms using an indexed orientation table. | | | No unlabeled orientation. No freehand axis guessing. | | | Transform must be applied before movement. | | | Object attachment must be declared. | | | If task.orientation is None: module passes through (non-spatial tasks unaffected). | | | """ | | | name = "orientation_state_register" | | | def run(self, task: TaskInput) -> ModuleResult: | | | o = task.orientation | | | if o is None: | | | return ModuleResult(self.name, "PASS", | |
| data={"note": "no orientation contract supplied; non-spatial task"}) | |
| reasons = [] | |
| # Rule 1: orientation_state_id required | | | state_id = o.get("orientation_state_id") | | | if not state_id: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.ORIENTATION_STATE_MISSING, | | | self.name, "CLARIFY", "missing_orientation_state_id")], | | | data={"orientation_table_keys": list(ORIENTATION_TABLE.keys())}) | | | # Rule 2: state_id must be in the table | | | if state_id not in ORIENTATION_TABLE: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.ORIENTATION_TRANSFORM_UNSUPPORTED, | |
| self.name, "CLARIFY", f"state_id_not_in_table: {state_id}")], | |
| data={"valid_ids": list(ORIENTATION_TABLE.keys())}) | |
| basis = ORIENTATION_TABLE[state_id] | |
| # Rule 3: frame_id required | | | if not o.get("frame_id"): | | | reasons.append(reason(ReasonCode.ORIENTATION_BASIS_UNDECLARED, | | | self.name, "CLARIFY", "missing_frame_id")) | | | # Rule 4: object_attachment required if movement is declared | | | movement = o.get("movement_event") or o.get("movement") | | | if movement and "object_attachment" not in o: | | | return ModuleResult(self.name, "CLARIFY", | | | [reason(ReasonCode.ORIENTATION_ATTACHMENT_UNDECLARED, | | | self.name, "CLARIFY", "object_attachment_missing")], | |
| data={ | |
| "explanation": ( | |
| "Two valid cases exist: " | | | "(A) independent object: move along rotated local axis from current position. " | | | "(B) attached object: rotate object position first, then apply local movement." | | | ) | | | }) | | | # Rule 5: movement_before_rotation check | |
| transform = o.get("transform_event") | |
| if o.get("movement_applied_before_rotation", False): | |
| reasons.append(reason(ReasonCode.ORIENTATION_MOVEMENT_BEFORE_ROTATION, | | | self.name, "BLOCK", "movement_before_rotation")) | | | return ModuleResult(self.name, "BLOCK", reasons) | | | # Rule 6: basis mismatch check | | | declared_basis = o.get("basis_map") or o.get("local_basis") | | | if declared_basis: | |
| for axis in ("local_x", "local_y", "local_z"): | |
| declared_val = declared_basis.get(axis, "").replace(" ", "") | |
| table_val = basis.get(axis, "").replace(" ", "") | |
| if declared_val and table_val and declared_val.upper() != table_val.upper(): | |
| reasons.append(reason(ReasonCode.ORIENTATION_BASIS_MISMATCH, | | | self.name, "REVISE", | |
| f"{axis}_declared={declared_val}_table={table_val}")) | |
| if any(r.severity == "BLOCK" for r in reasons): | |
| return ModuleResult(self.name, "BLOCK", reasons, data={"registered_basis": basis}) | |
| if any(r.code == ReasonCode.ORIENTATION_BASIS_MISMATCH for r in reasons): | |
| return ModuleResult(self.name, "REVISE", reasons, data={"registered_basis": basis}) | |
| if reasons: | | | return ModuleResult(self.name, "CLARIFY", reasons, data={"registered_basis": basis}) | | | # Compute final position if object_state and movement are provided | |
| computed = {} | |
| obj = o.get("object_state", {}) | |
| if obj and movement and "object_attachment" in o: | |
| start = obj.get("coordinates", []) | |
| attachment = o["object_attachment"] | |
| mv_axis = movement.get("axis", "") | |
| mv_mag = float(movement.get("magnitude", 0)) | |
| bv = basis.get("basis_vector", {}) | |
| axis_vec = bv.get(mv_axis.replace("local_", "").lower(), []) | |
| if start and axis_vec: | | | if attachment == "attached_to_cube" and transform: | | | rot_id = state_id | |
| if rot_id == "Y_CW_90_TOP_DOWN": | |
| rx = start[2] | |
| ry = start[1] | |
| rz = -start[0] | |
| rotated = [rx, ry, rz] | |
| elif rot_id == "Y_CCW_90_TOP_DOWN": | |
| rx = -start[2] | |
| ry = start[1] | |
| rz = start[0] | |
| rotated = [rx, ry, rz] | |
| elif rot_id == "Y_180": | |
| rotated = [-start[0], start[1], -start[2]] | |
| else: | |
| rotated = list(start) | |
| move_vec = [v * mv_mag for v in axis_vec] | |
| final = _dot_add(move_vec, rotated) | |
| computed = {"case": "B_attached", "rotated_first": rotated, "final": final} | |
| else: | |
| move_vec = [v * mv_mag for v in axis_vec] | |
| final = _dot_add(move_vec, start) | |
| computed = {"case": "A_independent", "final": final} | |
| return ModuleResult(self.name, "PASS", | | | data={"registered_basis": basis, "computed": computed or None}) | | | # ββ Layer 3C: Narrative State Validation βββββββββββββββββββββββββββββββββββ | | | class NarrativeStateValidationModule: | | | """ | | | Layer 3C β Narrative State Validation (NSV) | | | Tracks how interpretations change over time. | | | Does not determine truth. Evaluates whether: | | | - interpretation changes are declared | | | - evidence changes are tracked | | | - confidence changes are justified | | | - reconstructed narratives remain distinguishable from observations | | | If task.narrative is None: module passes through (non-narrative tasks unaffected). | | | Boundary contract: | | | NSV evaluates: how interpretation changed | | | Evidence Validation evaluates: whether evidence supports the claim | | | NSV does not determine truth. Evidence does not track narrative evolution. | | | """ | | | name = "narrative_state_validation" | | | def run(self, task: TaskInput) -> ModuleResult: | | | n = task.narrative | | | if n is None: | |
| return ModuleResult(self.name, "STABLE", data={"note": "no narrative contract supplied"}) | |
| reasons = [] | |
| constraints = [] | |
| observed = n.get("observed_input", "") | |
| initial_interp = n.get("initial_interpretation", "") | |
| updated_interp = n.get("updated_interpretation", "") | |
| new_constraints = n.get("new_constraints", []) | |
| evidence_delta = n.get("evidence_delta", []) | |
| conf_before = n.get("confidence_before", "") | |
| conf_after = n.get("confidence_after", "") | |
| retro_flag = bool(n.get("retroactive_rewrite", False)) | |
| conf_high_no_evidence = bool(n.get("confidence_high_with_low_evidence", False)) | |
| social_reinforcement = bool(n.get("social_reinforcement_as_validation", False)) | |
| # Check 1: Observation / interpretation separation | |
| if observed and initial_interp and observed.strip() == initial_interp.strip(): | |
| reasons.append(reason( | |
| ReasonCode.NARRATIVE_OBSERVATION_INTERPRETATION_MERGED, | | | self.name, "CONSTRAIN", | | | "observation_equals_interpretation", | | | "Raw observation must be distinguishable from its interpretation.")) | | | constraints.append(constraint( | | | self.name, "output_type", "separate_observation_and_interpretation", | | | ReasonCode.NARRATIVE_OBSERVATION_INTERPRETATION_MERGED)) | | | # Check 2: Narrative drift tracking β confidence escalation without evidence delta | |
| CONFIDENCE_RANK = {"low": 0, "medium": 1, "high": 2} | |
| before_rank = CONFIDENCE_RANK.get(conf_before, -1) | |
| after_rank = CONFIDENCE_RANK.get(conf_after, -1) | |
| if before_rank >= 0 and after_rank > before_rank and not evidence_delta: | |
| reasons.append(reason( | |
| ReasonCode.NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED, | | | self.name, "REVISE", | | | "confidence_increased_without_evidence_delta", | | | f"Confidence escalated {conf_before} -> {conf_after} with no declared evidence delta.")) | | | # Check 3: Reinterpretation must be linked to a declared new constraint | | | if updated_interp and updated_interp != initial_interp: | | | if not new_constraints and not evidence_delta: | | | reasons.append(reason( | | | ReasonCode.NARRATIVE_REINTERPRETATION_UNATTRIBUTED, | | | self.name, "CLARIFY", | | | "reinterpretation_without_declared_cause", | | | "Interpretation changed but no new constraint or evidence delta was declared.")) | | | # Check 4: Memory confidence separation | | | if conf_high_no_evidence or social_reinforcement: | | | reasons.append(reason( | | | ReasonCode.NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED, | | | self.name, "CONSTRAIN", | | | "confidence_not_grounded_in_evidence", | | | "High confidence without evidence, or social/emotional reinforcement used as validation.")) | | | constraints.append(constraint( | | | self.name, "confidence", "max_medium_without_evidence", | | | ReasonCode.NARRATIVE_CONFIDENCE_EVIDENCE_DECOUPLED)) | | | # Check 5: Retroactive reconstruction detection | | | if retro_flag: | | | reasons.append(reason( | | | ReasonCode.RETROACTIVE_NARRATIVE_COLLAPSE, | | | self.name, "REVISE", | | | "later_interpretation_presented_as_original_observation", | | | "A reconstructed interpretation must not be presented as unchanged historical observation.")) | | | meta_data = { | | | "confidence_before": conf_before, | | | "confidence_after": conf_after, | | | "evidence_delta_declared": bool(evidence_delta) | | | } | | | if any(r.severity == "BLOCK" for r in reasons): | | | return ModuleResult(self.name, "BLOCK", reasons, constraints, data=meta_data) | | | if any(r.code == ReasonCode.RETROACTIVE_NARRATIVE_COLLAPSE or | | | r.code == ReasonCode.NARRATIVE_CONFIDENCE_ESCALATION_UNDECLARED | | | for r in reasons): | | | return ModuleResult(self.name, "REVISE", reasons, constraints, data=meta_data) | | | if reasons: | |
| top_severity = max((r.severity for r in reasons), | |
| key=lambda s: {"INFO": 0, "CLARIFY": 1, "CONSTRAIN": 2, "REVISE": 3}.get(s, 0)) | |
| return ModuleResult(self.name, top_severity, reasons, constraints, data=meta_data) | | | return ModuleResult(self.name, "STABLE", data=meta_data) | | | # ββ Layer 4: Risk Analysis βββββββββββββββββββββββββββββββββββββββββββββββββ | | | class RiskAnalysisModule: | | | name = "risk" | |
| def run(self, task: TaskInput) -> ModuleResult: | |
| risk = task.risk or {} | |
| reasons = [] | |
| irreversible = bool(risk.get("irreversible", False)) | |
| cascading = bool(risk.get("cascading_failure_surface", False)) | |
| fragile = bool(risk.get("single_fragile_assumption", False)) | |
| mitigation_available = bool(risk.get("mitigation_available", True)) | |
| evidence_confidence = risk.get("evidence_confidence", "medium") | | | if cascading: | | | reasons.append(reason(ReasonCode.RISK_CASCADING_FAILURE_SURFACE, | | | self.name, "CRITICAL", "cascading_failure_surface")) | | | if irreversible and evidence_confidence == "low": | | | reasons.append(reason(ReasonCode.RISK_IRREVERSIBLE_LOW_CONFIDENCE, | | | self.name, "CRITICAL", "irreversible_low_confidence")) | | | if fragile: | | | reasons.append(reason(ReasonCode.RISK_FRAGILE_ASSUMPTION, | | | self.name, "CRITICAL", "single_fragile_assumption")) | | | if reasons: | | | if not mitigation_available or cascading or (irreversible and evidence_confidence == "low"): | | | reasons.append(reason(ReasonCode.RISK_UNRESOLVED_CRITICAL, | | | self.name, "BLOCK", "critical_unresolved")) | | | return ModuleResult(self.name, "BLOCK", reasons) | | | return ModuleResult(self.name, "CRITICAL", reasons) | | | if irreversible or risk.get("external_users_affected", False): | | | return ModuleResult(self.name, "RISK", | | | [reason("RISK_REVERSIBILITY_OR_EXTERNAL_IMPACT", | | | self.name, "RISK", "bounded_risk")]) | | | return ModuleResult(self.name, "SAFE") | | | # ββ Layer 5: Communication Validation βββββββββββββββββββββββββββββββββββββ | | | class CommunicationValidationModule: | | | name = "communication" | | | def run(self, task: TaskInput) -> ModuleResult: | | | text = task.proposed_output or "" | | | if not text: | |
| return ModuleResult(self.name, "VALID", data={"note": "no proposed output supplied"}) | |
| reasons = [] | |
| if confidence_word(text): | |
| reasons.append(reason(ReasonCode.COMMUNICATION_TONE_OVERCLAIM, | | | self.name, "REVISE", "certainty_language")) | | | if contains_any(text, ["you are the only one", "this defines you", "your identity is"]): | | | reasons.append(reason(ReasonCode.COMMUNICATION_IDENTITY_BINDING, | | | self.name, "BLOCK", "identity_binding")) | | | return ModuleResult(self.name, "BLOCK", reasons) | | | if len(text) < 25 and task.metadata.get("source_detail_length", 0) > 500: | | | reasons.append(reason(ReasonCode.COMMUNICATION_COMPRESSION_LOSS, | | | self.name, "REVISE", "severe_compression")) | | | if reasons: | | | return ModuleResult(self.name, "REVISE", reasons) | | | return ModuleResult(self.name, "VALID") | | | # ββ Layer 6: Evidence Validation βββββββββββββββββββββββββββββββββββββββββββ | | | class EvidenceValidationModule: | | | name = "evidence" | | | def run(self, task: TaskInput) -> ModuleResult: | | | output = task.proposed_output or "" | |
| evidence = task.evidence or {} | |
| reasons = [] | |
| constraints = [] | |
| external_validation = bool(evidence.get("external_validation", False)) | |
| baseline = evidence.get("baseline", None) | |
| observations = evidence.get("observations", []) | |
| if causal_word(output) and not evidence.get("causality_validated", False): | |
| reasons.append(reason(ReasonCode.EVIDENCE_CAUSALITY_OVERCLAIM, | | | self.name, "BLOCK", "causality_without_mapping")) | | | if confidence_word(output) and not external_validation: | | | reasons.append(reason(ReasonCode.EVIDENCE_CONFIDENCE_AS_TRUTH, | | | self.name, "BLOCK", "confidence_as_truth")) | | | if dashboard_authority_word(output) and not external_validation: | | | reasons.append(reason(ReasonCode.EVIDENCE_DASHBOARD_AUTHORITY, | | | self.name, "BLOCK", "dashboard_or_status_as_authority")) | | | if baseline is None and contains_any(output, ["trend", "stable", "improving", "weakening", "safe to automate"]): | | | reasons.append(reason(ReasonCode.EVIDENCE_NO_BASELINE, | | | self.name, "CONSTRAIN", "baseline_missing")) | | | constraints.append(constraint(self.name, "confidence", | | | "max_low_without_baseline", | |
| ReasonCode.EVIDENCE_NO_BASELINE)) | |
| if any(r.severity == "BLOCK" for r in reasons): | |
| return ModuleResult(self.name, "BLOCK", reasons, constraints) | | | if reasons or constraints: | | | return ModuleResult(self.name, "CONSTRAIN", reasons, constraints) | | | if observations or external_validation: | | | return ModuleResult(self.name, "RELEASE") | | | return ModuleResult(self.name, "HOLD", | | | [reason(ReasonCode.EVIDENCE_NO_BASELINE, self.name, "HOLD", | | | "no_observations_or_validation")]) | | | # ββ Layer 7: Decision Aggregator βββββββββββββββββββββββββββββββββββββββββββ | | | MODULE_TO_FINAL: Dict[str, FinalDecision] = { | | | "PROCEED": FinalDecision.AUTHORIZE, | | | "READY": FinalDecision.AUTHORIZE, | | | "PASS": FinalDecision.AUTHORIZE, | | | "STABLE": FinalDecision.AUTHORIZE, | | | "SAFE": FinalDecision.AUTHORIZE, | | | "VALID": FinalDecision.AUTHORIZE, | | | "RELEASE": FinalDecision.AUTHORIZE, | | | "DEFER": FinalDecision.HOLD, | | | "HALT": FinalDecision.HALT, | | | "FAIL": FinalDecision.CLARIFY, | | | "CLARIFY": FinalDecision.CLARIFY, | | | "CONSTRAIN": FinalDecision.CONSTRAIN, | | | "CONSTRAINED": FinalDecision.CONSTRAIN, | | | "REVISE": FinalDecision.REVISE, | | | "HOLD": FinalDecision.HOLD, | | | "UNSTABLE": FinalDecision.HOLD, | | | "RISK": FinalDecision.CONSTRAIN, | | | "CRITICAL": FinalDecision.BLOCK, | | | "BLOCK": FinalDecision.BLOCK, | | | } | | | RESTRICTIVENESS: Dict[str, int] = { | | | "full_output": 0, | | | "analysis_only": 1, | | | "analysis_only_no_persuasion": 2, | | | "separate_observation_and_interpretation": 2, | | | "max_medium_without_evidence": 3, | | | "max_low_without_baseline": 3, | | | "simulation_only": 3, | | | "no_execution": 4, | | | "BLOCK": 5, | | | } | | | class DecisionAggregator: | | | name = "decision_aggregator" | | | def aggregate(self, results: List[ModuleResult]) -> Dict[str, Any]: | | | final = FinalDecision.AUTHORIZE | |
| all_constraints: List[Constraint] = [] | |
| all_reasons: List[Reason] = [] | |
| all_violations: List[ScopeViolation] = [] | |
| for r in results: | | | all_constraints.extend(r.constraints) | | | all_reasons.extend(r.reasons) | | | all_violations.extend(r.scope_violations) | | | mapped = MODULE_TO_FINAL.get(r.decision, FinalDecision.HOLD) | | | if FINAL_PRIORITY[mapped] > FINAL_PRIORITY[final]: | | | final = mapped | |
| merged = self._merge_constraints(all_constraints) | |
| if any(c.severity == "BLOCK" for c in merged): | |
| final = FinalDecision.BLOCK | | | if final == FinalDecision.AUTHORIZE and any(c.severity not in ("INFO",) for c in merged): | | | final = FinalDecision.CONSTRAIN | | | return { | | | "decision": final.value, | |
| "constraints": [asdict(c) for c in merged], | |
| "reasons": [asdict(r) for r in all_reasons], | |
| "scope_violations": [asdict(v) for v in all_violations], | |
| } | | | @staticmethod | |
| def _merge_constraints(constraints: List[Constraint]) -> List[Constraint]: | |
| merged: Dict[str, Constraint] = {} | |
| for c in constraints: | | | if not c.scope or not c.bound: | |
| key = f"invalid_{len(merged)}" | |
| merged[key] = Constraint(c.source_module, "runtime", | |
| "constraint_invalid_requires_revision", | | | ReasonCode.CONSTRAINT_INVALID, "BLOCK") | | | continue | | | existing = merged.get(c.scope) | | | if existing is None: | | | merged[c.scope] = c | | | else: | |
| old_rank = RESTRICTIVENESS.get(existing.bound, 1) | |
| new_rank = RESTRICTIVENESS.get(c.bound, 1) | |
| if new_rank >= old_rank: | |
| merged[c.scope] = c | |
| return list(merged.values()) | |
| # ββ Pipeline runner ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | | | class LighthouseRunner: | | | """ | | | Full pipeline: -1, 0, 1, 2, 3A, 3B, 3C, 4, 5, 6, 7 | | | Layer 3 sublayers run in order: | | | 3A StateValidationModule | | | 3B OrientationStateRegister (only gates on spatial tasks) | | | 3C NarrativeStateValidationModule (only gates on narrative tasks) | | | """ | |
| def __init__(self) -> None: | |
| self.preflight = PreflightModule() | |
| self.execution = ExecutionControlModule() | |
| self.input_val = InputValidationModule() | |
| self.structure = StructuralValidationModule() | |
| self.state = StateValidationModule() | |
| self.osr = OrientationStateRegister() | |
| self.nsv = NarrativeStateValidationModule() | |
| self.risk = RiskAnalysisModule() | |
| self.communication = CommunicationValidationModule() | |
| self.evidence = EvidenceValidationModule() | |
| self.aggregator = DecisionAggregator() | |
| def run(self, task: TaskInput, context: Optional[RunContext] = None) -> Dict[str, Any]: | |
| context = context or RunContext() | |
| results = [] | |
| results.append(self.preflight.run(task)) | |
| execution_result = self.execution.run(task) | |
| results.append(execution_result) | |
| if execution_result.decision != "HALT": | |
| results.append(self.input_val.run(task)) | |
| results.append(self.structure.run(task)) | |
| results.append(self.state.run(task, context)) # 3A | |
| results.append(self.osr.run(task)) # 3B | |
| results.append(self.nsv.run(task)) # 3C | |
| results.append(self.risk.run(task)) | |
| results.append(self.communication.run(task)) | |
| results.append(self.evidence.run(task)) | |
| aggregate = self.aggregator.aggregate(results) | |
| return self._receipt(task, context, results, aggregate) | |
| def _receipt(self, task, context, results, aggregate) -> Dict[str, Any]: | |
| module_map = {} | |
| for r in results: | |
| entry = {"decision": r.decision, "reasons": [asdict(x) for x in r.reasons]} | |
| if r.module == "state": | |
| entry.update(r.data) | |
| elif r.data: | |
| entry["data"] = r.data | |
| module_map[r.module] = entry | |
| execution_trace = [ | |
| "preflight", "execution_control", "input_validation", | | | "structure", "state", "orientation_state_register", | | | "narrative_state_validation", "risk", "communication", | | | "evidence", "decision_aggregator", | | | ] | | | return { | | | "system_version": SYSTEM_VERSION, | | | "reason_registry_version": REASON_REGISTRY_VERSION, | |
| "input_id": task.metadata.get("input_id", f"input-{uuid.uuid4().hex[:10]}"), | |
| "timestamp": utc_now(), | |
| "execution_state": module_map.get("execution_control", {}).get("decision", "READY"), | |
| "execution_trace": execution_trace, | | | "run_context": asdict(context), | | | "modules": module_map, | |
| "constraints": aggregate["constraints"], | |
| "decision": aggregate["decision"], | |
| "revise_trace": task.metadata.get("revise_trace", []), | |
| "scope_violations": aggregate["scope_violations"], | |
| "override": task.metadata.get("override", { | |
| "used": False, "tier": None, "authorized_by": None, | |
| "authorization_level": None, "reason": None, | |
| "timestamp": None, "ttl": None, | |
| }), | |
| } | | | # ββ Demo cases βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | | | def demo_cases() -> Dict[str, TaskInput]: | | | return { | | | "clean": TaskInput( | | | prompt="Evaluate whether this statement can proceed.", | | | proposed_output="The observed structure persisted across five windows with consistent topology.", | | | intent="analysis", | | | frame_type="factual", | | | evidence={ | | | "observations": ["structure present across five windows"], | | | "external_validation": True, | | | "baseline": "available", | | | "confidence": "medium", | | | "causality_validated": True, | | | }, | | | ), | | | "overclaim": TaskInput( | | | prompt="Evaluate this output.", | | | proposed_output="The system clearly confirms a stable trend because the dashboard color is green.", | | | intent="analysis", | | | frame_type="factual", | |
| evidence={ | |
| "observations": ["dashboard_color=green"], | |
| "external_validation": False, | | | "baseline": None, | | | "confidence": "low", | | | "causality_validated": False, | | | }, | | | ), | | | "risk": TaskInput( | | | prompt="Should the system proceed with an irreversible migration?", | | | proposed_output="Proceed with the migration.", | | | intent="analysis", | | | frame_type="operational", | | | known_unknowns=["rollback feasibility", "dependency map"], | | | risk={ | | | "irreversible": True, | | | "cascading_failure_surface": True, | | | "mitigation_available": False, | | | "evidence_confidence": "low", | | | "external_users_affected": True, | | | }, | | | ), | | | "drift": TaskInput( | | | prompt="Calculate state stability.", | | | proposed_output="State drift is constrained.", | | | intent="analysis", | | | frame_type="factual", | | | state={ | | | "invariant_violations": 1, | | | "boundary_crossings": 1, | | | "assumption_changes": 2, | | | "inconsistencies": 1, | | | "prediction_divergence": True, | | | "shadow_state_available": False, | | | "requires_shadow_state": True, | | | }, | | | evidence={"observations": ["state signals provided"], "baseline": "declared"}, | | | ), | | | "spatial_ambiguous": TaskInput( | | | prompt="A wheel spins clockwise. Which way is it rotating?", | | | intent="analysis", | | | frame_type="factual", | | | orientation={ | | | "frame_id": "observer_unspecified", | | | }, | | | ), | | | "spatial_locked": TaskInput( | | | prompt="Viewed from above, a sphere starts at (1,1,1). The cube rotates 90 degrees clockwise.", | | | intent="analysis", | | | frame_type="factual", | | | orientation={ | | | "orientation_state_id": "Y_CW_90_TOP_DOWN", | | | "frame_id": "observer_global", | | | "object_attachment": "independent_of_cube", | | | "object_state": { | | | "object_id": "red_sphere", | | | "coordinates": [1, 1, 1], | | | "coordinate_frame": "observer_global", | | | }, | | | "transform_event": { | | | "axis": "Y", | | | "rotation": "90_CW", | | | "reference_view": "top_down", | | | }, | | | "movement": { | | | "axis": "local_x", | | | "magnitude": 1, | | | }, | | | }, | | | ), | | | "spatial_attached": TaskInput( | | | prompt="Sphere at (1,1,1) is attached to the cube. Cube rotates 90 degrees CW top-down.", | | | intent="analysis", | | | frame_type="factual", | | | orientation={ | | | "orientation_state_id": "Y_CW_90_TOP_DOWN", | | | "frame_id": "observer_global", | | | "object_attachment": "attached_to_cube", | | | "object_state": { | | | "object_id": "red_sphere", | | | "coordinates": [1, 1, 1], | | | "coordinate_frame": "observer_global", | | | }, | | | "transform_event": { | | | "axis": "Y", | | | "rotation": "90_CW", | | | "reference_view": "top_down", | | | }, | | | "movement": { | | | "axis": "local_x", | | | "magnitude": 1, | | | }, | | | }, | | | ), | | | "narrative_drift": TaskInput( | | | prompt="Evaluate this interpretation update.", | | | intent="analysis", | | | frame_type="factual", | | | narrative={ | | | "observed_input": "The dashboard is green.", | | | "initial_interpretation": "The system may be operating normally.", | | | "updated_interpretation": "The system is confirmed stable.", | |
| "new_constraints": [], | |
| "evidence_delta": [], | |
| "confidence_before": "low", | | | "confidence_after": "high", | | | }, | | | ), | | | "narrative_retro": TaskInput( | | | prompt="Evaluate whether this memory reconstruction is valid.", | | | intent="analysis", | | | frame_type="factual", | | | narrative={ | | | "observed_input": "Pattern A was observed three times.", | | | "initial_interpretation": "Pattern A may indicate instability.", | | | "updated_interpretation": "We always knew Pattern A was the root cause.", | |
| "new_constraints": [], | |
| "evidence_delta": [], | |
| "confidence_before": "medium", | | | "confidence_after": "high", | | | "retroactive_rewrite": True, | | | }, | | | ), | | | "narrative_clean": TaskInput( | | | prompt="Evaluate this properly attributed interpretation update.", | | | intent="analysis", | | | frame_type="factual", | | | narrative={ | | | "observed_input": "System logged 5 errors in window 3.", | | | "initial_interpretation": "Possible instability in module X.", | | | "updated_interpretation": "Confirmed instability in module X.", | | | "new_constraints": ["baseline comparison now available"], | | | "evidence_delta": ["external_audit_confirmed_module_X_failure"], | | | "confidence_before": "low", | | | "confidence_after": "high", | | | "retroactive_rewrite": False, | | | }, | | | ), | | | } | |
| def task_from_json(path: str) -> TaskInput: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return TaskInput(**data) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Public Lighthouse Core v1.7 reference runner") | |
| parser.add_argument("--demo", choices=list(demo_cases().keys()), help="Run a built-in demo case") | |
| parser.add_argument("--input", help="Path to JSON task input") | |
| parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON") | |
| args = parser.parse_args() | |
| if args.input: | | | task = task_from_json(args.input) | | | elif args.demo: | | | task = demo_cases()[args.demo] | | | else: | |
| task = demo_cases()["overclaim"] | |
| receipt = LighthouseRunner().run(task) | |
| print(json.dumps(receipt, indent=2 if args.pretty else None)) | |
| if __name__ == "__main__": | |
| main() |