#!/usr/bin/env python3
"""
A reference agent harness for blog.pdj.dev - the runnable companion to the
four essays. It embodies their ideas in ~one file, in the Hermes function-
calling style (structured <tool_call> tags, a reason-act loop).

What each piece realizes:
  - Reason-act loop + Hermes <tool_call>/<tool_response> protocol  (the harness)
  - Capability-scoped tools, composed by intersection / least authority
        -> "The Orchestration Substrate", Proposition 3
  - Verifier-gated output with resampling (generate-and-check)
        -> "The Verifier's Advantage", Propositions 1-4
  - A step/sample budget that makes runaway behavior self-limiting
        -> "Swarm Economies", Proposition 3
  - A bounded resident context window over the durable transcript: the model
        sees a budgeted working set, not the whole log; evict by recency
        -> "Context as Compute"
  - A durable, checkpointed transcript: a crash resumes from the last step
        instead of restarting the run
        -> "The Orchestration Substrate", Theorem 1
  - Sub-agent spawn under an attenuated capability: a fleet is a spawn tree of
        harnesses, each strictly weaker than its parent
        -> "Swarm Economies" + "The Orchestration Substrate", Proposition 3

Run:  python3 harness.py   (tests: python3 test_harness.py)
Deterministic, standard library only, no network. The model here is a scripted
stub so the harness runs with no API key; a real Hermes / OpenAI-compatible
model implements the same `Model` protocol (see `Model` and `ScriptedModel`).
"""

from __future__ import annotations
import ast
import json
import operator
import re
from dataclasses import dataclass, field
from typing import Callable, Protocol

# ---------------------------------------------------------------------------
# Capabilities: authority is an unforgeable, attenuable token, not ambient.
# Composition is by intersection, so a child capability can never exceed its
# parent (least authority; bounded blast radius by construction).
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Capability:
    grants: frozenset[str]

    @staticmethod
    def of(*names: str) -> "Capability":
        return Capability(frozenset(names))

    def attenuate(self, *names: str) -> "Capability":
        """A weaker capability: the intersection of what we hold and what we ask."""
        return Capability(self.grants & frozenset(names))

    def permits(self, name: str) -> bool:
        return name in self.grants


# ---------------------------------------------------------------------------
# Tools. Each declares the capability it requires; the harness enforces it.
# ---------------------------------------------------------------------------
@dataclass
class Tool:
    name: str
    description: str
    requires: str                       # capability name needed to invoke
    fn: Callable[[dict], str]
    signature: str = ""

# Safe arithmetic: parse to an AST and walk only numeric/operator nodes.
# No eval, no names, no calls - arbitrary code cannot run.
_OPS = {
    ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
    ast.Div: operator.truediv, ast.Pow: operator.pow, ast.USub: operator.neg,
    ast.UAdd: operator.pos, ast.Mod: operator.mod,
}

def _arith(node: ast.AST) -> float:
    if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
        return node.value
    if isinstance(node, ast.BinOp) and type(node.op) in _OPS:
        return _OPS[type(node.op)](_arith(node.left), _arith(node.right))
    if isinstance(node, ast.UnaryOp) and type(node.op) in _OPS:
        return _OPS[type(node.op)](_arith(node.operand))
    raise ValueError("calc accepts arithmetic only")

def _calc(args: dict) -> str:
    expr = str(args.get("expression", ""))
    value = _arith(ast.parse(expr, mode="eval").body)
    return str(int(value) if float(value).is_integer() else value)

def _search(args: dict) -> str:
    # A stub knowledge source (deterministic; no network).
    table = {
        "multiplication": "Multiplication is repeated addition (Peano, 1889).",
        "gittins index": "Optimal index policy for bandit allocation (Gittins, 1979).",
    }
    q = str(args.get("query", "")).lower()
    return next((v for k, v in table.items() if k in q), "no results")

def _write_file(args: dict) -> str:
    return f"wrote {args.get('path')}"     # never reached in the demo (no cap)

TOOLS: dict[str, Tool] = {
    "calc": Tool("calc", "Evaluate an arithmetic expression.", "compute", _calc,
                 '{"expression": "<arithmetic>"}'),
    "search": Tool("search", "Look up a short factual note.", "net.read", _search,
                   '{"query": "<terms>"}'),
    "write_file": Tool("write_file", "Write a file to disk.", "fs.write", _write_file,
                       '{"path": "<path>", "content": "<text>"}'),
}


# ---------------------------------------------------------------------------
# Hermes function-calling protocol: tools are declared in the system prompt
# inside <tools>; the model emits <tool_call>{json}</tool_call>; the harness
# returns <tool_response>{json}</tool_response>.
# ---------------------------------------------------------------------------
_TOOL_CALL_RE = re.compile(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", re.DOTALL)

def system_prompt(tools: list[Tool]) -> str:
    decls = "\n".join(
        json.dumps({"name": t.name, "description": t.description, "arguments": t.signature})
        for t in tools
    )
    return (
        "You are a tool-using agent. Reason, then act. To call a tool, emit\n"
        '<tool_call>{"name": ..., "arguments": {...}}</tool_call>.\n'
        "When you have the answer, reply with FINAL: <answer>.\n"
        f"<tools>\n{decls}\n</tools>"
    )

def parse_tool_calls(text: str) -> list[dict]:
    out = []
    for blob in _TOOL_CALL_RE.findall(text):
        try:
            out.append(json.loads(blob))
        except json.JSONDecodeError:
            pass
    return out

def final_answer(text: str) -> str | None:
    m = re.search(r"FINAL:\s*(.+)", text, re.DOTALL)
    return m.group(1).strip() if m else None


# ---------------------------------------------------------------------------
# Model interface. A real Hermes / OpenAI-compatible client implements
# `complete(messages) -> str`; ScriptedModel is a deterministic stand-in so
# the harness is runnable and reproducible with no API.
# ---------------------------------------------------------------------------
class Model(Protocol):
    def complete(self, messages: list[dict]) -> str: ...

@dataclass
class ScriptedModel:
    """Replays a fixed sequence of assistant turns - one per reason-act step."""
    turns: list[str]
    _i: int = 0

    def complete(self, messages: list[dict]) -> str:
        _ = messages                      # scripted model ignores context; replays turns in order
        turn = self.turns[min(self._i, len(self.turns) - 1)]
        self._i += 1
        return turn


# ---------------------------------------------------------------------------
# Context as compute: the full transcript is the durable record (cold storage),
# but only a bounded, most-recent working set is sent to the model each step.
# Residency is budgeted and older turns are evicted - the context window is the
# top of a memory hierarchy, not an infinite scratchpad. (Context as Compute.)
# ---------------------------------------------------------------------------
@dataclass
class ContextWindow:
    token_budget: int = 512                       # approx resident tokens beyond system + task

    @staticmethod
    def _tokens(msg: dict) -> int:
        return max(1, len(msg["content"]) // 4)   # cheap chars-per-token proxy

    def apply(self, messages: list[dict]) -> list[dict]:
        """Resident working set: system + task + the most recent turns that fit."""
        if len(messages) <= 2:
            return list(messages)
        head, tail = messages[:2], messages[2:]
        resident: list[dict] = []
        used = 0
        for msg in reversed(tail):                # newest first (recency-ordered residency)
            t = self._tokens(msg)
            if resident and used + t > self.token_budget:
                break
            resident.append(msg)
            used += t
        resident.reverse()
        out = list(head)
        if len(resident) < len(tail):             # something was evicted
            out.append({"role": "system",
                        "content": "[earlier steps evicted to stay within the context "
                                   "budget; the full transcript is retained durably]"})
        out.extend(resident)
        return out


# ---------------------------------------------------------------------------
# The harness: a reason-act loop with capability enforcement, a step budget,
# a verifier gate with resampling, a bounded resident context window, and a
# checkpointed (resumable) transcript.
# ---------------------------------------------------------------------------
@dataclass
class Result:
    answer: str | None
    accepted: bool
    steps: int
    samples: int
    transcript: list[dict] = field(default_factory=list)

@dataclass
class Harness:
    model: Model
    capability: Capability
    verifier: Callable[[str], bool]
    tools: dict[str, Tool] = field(default_factory=lambda: TOOLS)
    step_budget: int = 6                 # max reason-act steps per sample
    sample_budget: int = 3               # max generate-and-check resamples
    context: ContextWindow = field(default_factory=ContextWindow)
    log: Callable[[str], None] = print

    def _dispatch(self, call: dict) -> str:
        name = call.get("name", "")
        tool = self.tools.get(name)
        if tool is None:
            return json.dumps({"error": f"unknown tool {name!r}"})
        if not self.capability.permits(tool.requires):     # least-authority gate
            self.log(f"  [capability] DENIED {name} (needs '{tool.requires}')")
            return json.dumps({"error": f"capability '{tool.requires}' not held"})
        try:
            return json.dumps({"result": tool.fn(call.get("arguments", {}))})
        except Exception as e:                              # tool failure is data
            return json.dumps({"error": f"{type(e).__name__}: {e}"})

    def _run_once(self, messages: list[dict], checkpoint: list[dict]) -> str | None:
        """One sample: reason-act until a final answer or the step budget."""
        for step in range(len(checkpoint), self.step_budget):
            reply = self.model.complete(self.context.apply(messages))   # bounded resident window

            messages.append({"role": "assistant", "content": reply})
            checkpoint.append({"step": step, "assistant": reply})

            ans = final_answer(reply)
            if ans is not None:
                self.log(f"  step {step}: FINAL -> {ans}")
                return ans

            calls = parse_tool_calls(reply)
            if not calls:
                self.log(f"  step {step}: (no action) -> halting")
                return None
            for call in calls:
                self.log(f"  step {step}: tool_call {call.get('name')}({call.get('arguments', {})})")
                resp = self._dispatch(call)
                self.log(f"            tool_response {resp}")
                messages.append({"role": "tool", "content": f"<tool_response>{resp}</tool_response>"})
        self.log(f"  [budget] step budget ({self.step_budget}) exhausted")
        return None

    def run(self, task: str, resume: list[dict] | None = None) -> Result:
        checkpoint = list(resume or [])
        for sample in range(self.sample_budget):
            # Rebuild the conversation from the (possibly resumed) checkpoint -
            # durable execution: a crash replays the log instead of restarting.
            messages = [{"role": "system", "content": system_prompt(list(self.tools.values()))},
                        {"role": "user", "content": task}]
            for entry in checkpoint:
                messages.append({"role": "assistant", "content": entry["assistant"]})
            self.log(f"sample {sample}:")
            ans = self._run_once(messages, checkpoint)
            if ans is not None and self.verifier(ans):       # generate-and-check
                self.log("  [verifier] ACCEPT")
                return Result(ans, True, len(checkpoint), sample + 1, checkpoint)
            if ans is not None:
                self.log("  [verifier] REJECT -> resample")
            checkpoint = []                                  # fresh sample
        return Result(None, False, len(checkpoint), self.sample_budget, checkpoint)

    def spawn(self, task: str, grant: tuple[str, ...], model: Model,
              verifier: Callable[[str], bool] | None = None) -> Result:
        """Delegate a sub-task to a child agent under an attenuated capability.

        The child's authority is the intersection of ours and what we grant, so
        it can never exceed the parent: authority is monotone non-increasing down
        the spawn tree (least authority; the blast radius of any descendant is
        bounded by the meet along its spawn path). This is the multi-agent
        primitive - a fleet is a spawn tree of harnesses, each strictly weaker
        than the one above it.
        """
        child = Harness(
            model=model,
            capability=self.capability.attenuate(*grant),
            verifier=verifier or self.verifier,
            tools=self.tools,
            step_budget=self.step_budget,
            sample_budget=self.sample_budget,
            log=self.log,
        )
        self.log(f"  [spawn] child capability {sorted(child.capability.grants)} "
                 f"= parent {sorted(self.capability.grants)} & granted {sorted(grant)}")
        return child.run(task)


# ---------------------------------------------------------------------------
# Demonstration. Task: compute 17 * 23 and cite a source. The scripted model
# reasons, calls calc, tries an unauthorized write (denied), searches, and
# answers. We grant {compute, net.read} but NOT fs.write, so the write is
# refused by construction. The verifier requires the computed number present.
# ---------------------------------------------------------------------------
def demo() -> Result:
    script = [
        'I will compute the product.\n<tool_call>{"name": "calc", "arguments": {"expression": "17*23"}}</tool_call>',
        'Let me persist that.\n<tool_call>{"name": "write_file", "arguments": {"path": "/tmp/x", "content": "391"}}</tool_call>',
        'No write access; I will cite instead.\n<tool_call>{"name": "search", "arguments": {"query": "multiplication"}}</tool_call>',
        'FINAL: 17 * 23 = 391. Source: Multiplication is repeated addition (Peano, 1889).',
    ]
    # Ask for {compute, net.read, fs.write} but attenuate to {compute, net.read}:
    # the intersection drops fs.write, so write_file is unreachable by construction.
    granted = Capability.of("compute", "net.read", "fs.write").attenuate("compute", "net.read")
    verifier = lambda ans: "391" in ans          # cheap check: the answer must carry the number
    h = Harness(model=ScriptedModel(script), capability=granted, verifier=verifier)

    print(f"granted capabilities: {sorted(granted.grants)}  (fs.write attenuated away)\n")
    print("=== task: compute 17 * 23 and cite a source ===")
    result = h.run("Compute 17 * 23 and cite a source.")
    print(f"\naccepted={result.accepted}  answer={result.answer!r}  samples={result.samples}")

    # Durability: simulate a crash after the first checkpointed step, then resume
    # from the checkpoint rather than re-running step 0.
    print("\n=== durable resume (crash after step 0, resume from checkpoint) ===")
    crashed_at = result.transcript[:1]
    h2 = Harness(model=ScriptedModel(script[1:]), capability=granted, verifier=verifier)
    resumed = h2.run("Compute 17 * 23 and cite a source.", resume=crashed_at)
    print(f"\nresumed accepted={resumed.accepted}  answer={resumed.answer!r}")

    # Sub-agent: delegate under an attenuated capability. We "grant" the child
    # {net.read, fs.write}, but the grant intersects the parent's {compute,
    # net.read}, so the child receives only {net.read}: it cannot exceed the
    # parent, and its own write attempt is refused by construction.
    print("\n=== sub-agent spawn (attenuated capability) ===")
    child_script = [
        'Caching the lookup first.\n<tool_call>{"name": "write_file", "arguments": {"path": "/tmp/c", "content": "x"}}</tool_call>',
        'No write access; searching instead.\n<tool_call>{"name": "search", "arguments": {"query": "gittins index"}}</tool_call>',
        'FINAL: Optimal index policy for bandit allocation (Gittins, 1979).',
    ]
    sub = h.spawn("Find a citation for the Gittins index.",
                  grant=("net.read", "fs.write"),
                  model=ScriptedModel(child_script),
                  verifier=lambda ans: "Gittins" in ans)
    print(f"\nsub-agent accepted={sub.accepted}  answer={sub.answer!r}")

    # Context as compute: the durable transcript may grow without bound, but the
    # model only ever sees a bounded resident window; older turns are evicted.
    print("\n=== context window (bounded resident working set) ===")
    long_log = ([{"role": "system", "content": "you are an agent"},
                 {"role": "user", "content": "a long task"}]
                + [{"role": "assistant", "content": f"step {i}: reasoning and a tool call"}
                   for i in range(40)])
    resident = ContextWindow(token_budget=60).apply(long_log)
    print(f"durable transcript: {len(long_log)} messages; resident window: {len(resident)} "
          f"(system + task + most recent within budget; the rest evicted)")
    return result


if __name__ == "__main__":
    r = demo()
    raise SystemExit(0 if r.accepted else 1)
