How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination
In this tutorial, we construct OpenHarness from scratch to higher perceive how a sensible agent harness works. We recreate the main constructing blocks that make an agent system helpful, together with software use, typed software schemas, permissions, lifecycle hooks, reminiscence, abilities, context compaction, retry logic, price monitoring, and multi-agent coordination. Instead of treating an agent framework as a black field, we expose the complete management stream and watch how the harness receives a person job, lets the mannequin determine the subsequent motion, validates and executes software calls, returns observations, and continues the loop till the duty is full. We additionally preserve the implementation runnable so we are able to experiment with the structure without having API keys or advanced infrastructure.
Setting Up the OpenHarness Core
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import fnmatch
import io
import json
import os
import re
import tempfile
import textwrap
import time
import traceback
import varieties
import typing
import urllib.error
import urllib.request
from dataclasses import dataclass, discipline
from enum import Enum
MISSING = dataclasses.MISSING
UnionType = getattr(varieties, "UnionType", None)
def run_async(coro):
"""Run a coroutine to completion from sync code, even inside a reside loop."""
attempt:
loop = asyncio.get_running_loop()
besides RuntimeError:
loop = None
if loop shouldn't be None and loop.is_running():
attempt:
import nest_asyncio
nest_asyncio.apply()
return loop.run_until_complete(coro)
besides Exception:
import threading
field: dict = {}
def _runner():
new_loop = asyncio.new_event_loop()
attempt:
field["value"] = new_loop.run_until_complete(coro)
lastly:
new_loop.shut()
t = threading.Thread(goal=_runner)
t.begin()
t.be a part of()
return field["value"]
return asyncio.run(coro)
BANNER = "═" * 78
def banner(title: str) -> None:
print("n" + BANNER)
print(f" {title}")
print(BANNER)
def clarify(title: str, physique: str) -> None:
banner(title)
print(textwrap.fill(textwrap.dedent(physique).strip(), width=78))
print("-" * 78)
def quick(textual content: str, n: int = 240) -> str:
textual content = " ".be a part of(str(textual content).cut up())
return textual content if len(textual content) <= n else textual content[: n - 1] + "…"
@dataclass
class Usage:
input_tokens: int = 0
output_tokens: int = 0
def __add__(self, different: "Usage") -> "Usage":
return Usage(self.input_tokens + different.input_tokens,
self.output_tokens + different.output_tokens)
@dataclass
class ToolName:
id: str
title: str
arguments: dict
@dataclass
class AssistantTurn:
"""One flip produced by the mannequin: some textual content + zero or extra software calls."""
textual content: str = ""
tool_calls: checklist = discipline(default_factory=checklist)
stop_reason: str = "end_turn"
utilization: Usage = discipline(default_factory=Usage)
@dataclass
class Message:
"""A single message within the operating dialog transcript."""
position: str
content material: str = ""
tool_calls: checklist = discipline(default_factory=checklist)
tool_call_id: str = ""
title: str = ""
def count_tokens(textual content: str) -> int:
"""Cheap, provider-agnostic token estimate (~4 chars/token)."""
if not textual content:
return 0
return max(1, spherical(len(textual content) / 4))
PRICE_BOOK = {
"mock-sonnet": (3.00, 15.00),
"claude-sonnet-4": (3.00, 15.00),
"gpt-4.1": (2.00, 8.00),
"default": (1.00, 3.00),
}
class CostMeter:
"""Accumulates token utilization and converts it to an estimated greenback price."""
def __init__(self, mannequin: str):
self.mannequin = mannequin
self.whole = Usage()
self.calls = 0
def add(self, utilization: Usage) -> None:
self.whole = self.whole + utilization
self.calls += 1
@property
def {dollars}(self) -> float:
pin, pout = PRICE_BOOK.get(self.mannequin, PRICE_BOOK["default"])
return (self.whole.input_tokens / 1e6) * pin +
(self.whole.output_tokens / 1e6) * pout
def abstract(self) -> str:
return (f"{self.calls} mannequin name(s) | "
f"in={self.whole.input_tokens} out={self.whole.output_tokens} tok | "
f"~${self.{dollars}:.5f} ({self.mannequin})")
def fld(description: str = "", default=MISSING, default_factory=MISSING):
"""Declare a tool-input discipline with an outline (and elective default)."""
md = {"description": description}
if default_factory shouldn't be MISSING:
return discipline(default_factory=default_factory, metadata=md)
if default shouldn't be MISSING:
return discipline(default=default, metadata=md)
return discipline(metadata=md)
def _is_optional(t) -> bool:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType shouldn't be None and origin is UnionType):
return sort(None) in typing.get_args(t)
return False
def _py_to_json_type(t) -> dict:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType shouldn't be None and origin is UnionType):
args = [a for a in typing.get_args(t) if a is not type(None)]
return _py_to_json_type(args[0]) if args else {"sort": "string"}
if t is str:
return {"sort": "string"}
if t is bool:
return {"sort": "boolean"}
if t is int:
return {"sort": "integer"}
if t is float:
return {"sort": "quantity"}
if origin is checklist or t is checklist:
args = typing.get_args(t)
merchandise = _py_to_json_type(args[0]) if args else {"sort": "string"}
return {"sort": "array", "gadgets": merchandise}
if origin is dict or t is dict:
return {"sort": "object"}
return {"sort": "string"}
def build_json_schema(model_cls) -> dict:
"""Turn a dataclass enter mannequin right into a JSON Schema (object with properties)."""
hints = typing.get_type_hints(model_cls)
props, required = {}, []
for f in dataclasses.fields(model_cls):
t = hints.get(f.title, str)
js = dict(_py_to_json_type(t))
desc = f.metadata.get("description", "")
if desc:
js["description"] = desc
props[f.name] = js
has_default = (f.default shouldn't be MISSING) or (f.default_factory shouldn't be MISSING)
if not has_default and not _is_optional(t):
required.append(f.title)
schema = {"sort": "object", "properties": props}
if required:
schema["required"] = required
return schema
def _coerce(v, t):
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType shouldn't be None and origin is UnionType):
if v is None:
return None
args = [a for a in typing.get_args(t) if a is not type(None)]
return _coerce(v, args[0]) if args else v
if t is str:
return v if isinstance(v, str) else str(v)
if t is bool:
if isinstance(v, bool):
return v
if isinstance(v, str):
return v.strip().decrease() in ("1", "true", "sure", "y", "on")
return bool(v)
if t is int:
return int(v)
if t is float:
return float(v)
if origin is checklist or t is checklist:
args = typing.get_args(t)
it = args[0] if args else str
if not isinstance(v, checklist):
v = [v]
return [_coerce(x, it) for x in v]
if origin is dict or t is dict:
return dict(v) if v else {}
return v
def instantiate(model_cls, uncooked: dict):
"""Validate + coerce uncooked JSON args right into a typed enter occasion."""
hints = typing.get_type_hints(model_cls)
uncooked = uncooked or {}
kwargs = {}
for f in dataclasses.fields(model_cls):
t = hints.get(f.title, str)
if f.title in uncooked and uncooked[f.name] shouldn't be None:
attempt:
kwargs[f.name] = _coerce(uncooked[f.name], t)
besides (TypeError, ValueError) as e:
increase ValueError(f"Bad worth for '{f.title}': {e}")
elif f.default shouldn't be MISSING:
kwargs[f.name] = f.default
elif f.default_factory shouldn't be MISSING:
kwargs[f.name] = f.default_factory()
elif _is_optional(t):
kwargs[f.name] = None
else:
increase ValueError(f"Missing required argument '{f.title}'")
return model_cls(**kwargs)
class PermissionSort(Enum):
"""How harmful a software is — drives the default permission coverage."""
READ = "learn"
WRITE = "write"
EXECUTE = "execute"
META = "meta"
@dataclass
class ToolResult:
output: str
is_error: bool = False
metadata: dict = discipline(default_factory=dict)
class ToolContext:
"""Everything a software might have at runtime (providers + shared state)."""
def __init__(self, **providers):
self.__dict__.replace(providers)
class BaseTool:
"""Base class for all instruments. Subclasses set title/description/InputModel/sort
and implement `execute`. Schema + validation are dealt with right here."""
title: str = "base"
description: str = ""
InputModel = None
sort: PermissionSort = PermissionSort.READ
def schema(self) -> dict:
return {
"title": self.title,
"description": self.description,
"sort": self.sort.worth,
"input_schema": (build_json_schema(self.InputModel)
if self.InputModel else
{"sort": "object", "properties": {}}),
}
async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:
args = instantiate(self.InputModel, raw_args) if self.InputModel else None
return await self.execute(args, ctx)
async def execute(self, args, ctx: ToolContext) -> ToolResult:
increase NotImplementedError
class ToolRegistry:
def __init__(self):
self._tools: dict = {}
def register(self, software: BaseTool) -> "ToolRegistry":
self._tools[tool.name] = software
return self
def get(self, title: str) -> BaseTool | None:
return self._tools.get(title)
def schemas(self) -> checklist:
return [t.schema() for t in self._tools.values()]
def names(self) -> checklist:
return checklist(self._tools)
class VirtualFS:
"""In-memory filesystem. Keeps the tutorial secure & deterministic in Colab."""
def __init__(self):
self.information: dict = {}
@staticmethod
def norm(path: str) -> str:
return path.lstrip("./").strip()
def write(self, path: str, content material: str) -> None:
self.information[self.norm(path)] = content material
def learn(self, path: str) -> str:
return self.information[self.norm(path)]
def exists(self, path: str) -> bool:
return self.norm(path) in self.information
def checklist(self, sample: str = "*") -> checklist:
return sorted(p for p in self.information if fnmatch.fnmatch(p, sample))
def tree(self) -> str:
if not self.information:
return "(empty)"
return "n".be a part of(f" {p} ({len(c)} bytes)"
for p, c in sorted(self.information.gadgets()))
class PermissionMode(Enum):
DEFAULT = "default"
AUTO = "auto"
PLAN = "plan"
@dataclass
class PermissionDetermination:
motion: str
purpose: str = ""
SENSITIVE_PATTERNS = [
"/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",
"*credentials*", "*.env", "*/secrets/*",
]
class PermissionChecker:
def __init__(self, mode: PermissionMode = PermissionMode.DEFAULT,
path_rules: checklist | None = None,
denied_commands: checklist | None = None):
self.mode = mode
self.path_rules = path_rules or []
self.denied_commands = denied_commands or []
def _check_path(self, path: str) -> PermissionDetermination | None:
for pat in SENSITIVE_PATTERNS:
if fnmatch.fnmatch(path, pat):
return PermissionDetermination("deny", f"delicate path '{path}' ({pat})")
for rule in self.path_rules:
if fnmatch.fnmatch(path, rule["pattern"]):
if rule.get("enable", True):
return PermissionDetermination("enable", f"path rule permits '{rule['pattern']}'")
return PermissionDetermination("deny", f"path rule blocks '{rule['pattern']}'")
return None
def _check_command(self, command: str) -> PermissionDetermination | None:
for pat in self.denied_commands:
if re.search(pat, command):
return PermissionDetermination("deny", f"denied command matched /{pat}/")
return None
def test(self, software: BaseTool, args: dict) -> PermissionDetermination:
if "path" in args and software.sort in (PermissionSort.WRITE, PermissionSort.EXECUTE):
d = self._check_path(str(args["path"]))
if d:
return d
if "command" in args:
d = self._check_command(str(args["command"]))
if d:
return d
if self.mode is PermissionMode.AUTO:
return PermissionDetermination("enable", "auto mode")
if self.mode is PermissionMode.PLAN:
if software.sort in (PermissionSort.WRITE, PermissionSort.EXECUTE):
return PermissionDetermination("deny", "plan mode blocks writes/executes")
return PermissionDetermination("enable", "plan mode permits reads")
if software.sort in (PermissionSort.READ, PermissionSort.META):
return PermissionDetermination("enable", "secure software")
return PermissionDetermination("ask", f"{software.sort.worth} requires approval")
async def auto_approve(software, args, purpose) -> bool:
print(f"
approval wanted: {software.title} ({purpose}) -> [auto-approved]")
return True
async def interactive_approve(software, args, purpose) -> bool:
ans = enter(f"
Allow {software.title}({quick(json.dumps(args), 80)})? [y/N] ")
return ans.strip().decrease().startswith("y")
@dataclass
class HookOutcome:
blocked: bool = False
purpose: str = ""
arguments: dict | None = None
class HookManager:
"""Lifecycle occasions round each software name (like PreToolUse/SubmitToolUse)."""
def __init__(self):
self.pre: checklist = []
self.submit: checklist = []
def add_pre(self, fn):
self.pre.append(fn); return self
def add_post(self, fn):
self.submit.append(fn); return self
def run_pre(self, name: ToolName, software: BaseTool, ctx: ToolContext) -> HookOutcome:
args = dict(name.arguments)
for fn in self.pre:
out = fn(name, software, ctx)
if out is None:
proceed
if out.blocked:
return out
if out.arguments shouldn't be None:
args = out.arguments
return HookOutcome(arguments=args)
def run_post(self, name, software, outcome: ToolResult, ctx) -> ToolResult:
for fn in self.submit:
new = fn(name, software, outcome, ctx)
if new shouldn't be None:
outcome = new
return outcome
We start by establishing the inspiration for the OpenHarness-style tutorial, together with imports, async execution assist, helper capabilities, and core information fashions. We outline messages, software calls, utilization monitoring, token counting, price estimation, permission modes, hooks, and the digital filesystem that retains execution secure. We use this snippet to set up the essential structure on which all subsequent instruments, agent loops, and demos rely.
Building the Tool Layer
@dataclass
class WriteFileInput:
path: str = fld("File path to write")
content material: str = fld("Full file content material")
class WriteFileTool(BaseTool):
title = "write_file"
description = "Create or overwrite a file with the given content material."
InputModel = WriteFileInput
sort = PermissionSort.WRITE
async def execute(self, args: WriteFileInput, ctx) -> ToolResult:
ctx.vfs.write(args.path, args.content material)
return ToolResult(f"Wrote {len(args.content material)} bytes to {args.path}")
@dataclass
class ReadFileInput:
path: str = fld("File path to learn")
class ReadFileTool(BaseTool):
title = "read_file"
description = "Read the complete contents of a file."
InputModel = ReadFileInput
sort = PermissionSort.READ
async def execute(self, args: ReadFileInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"No such file: {args.path}", is_error=True)
return ToolResult(ctx.vfs.learn(args.path))
@dataclass
class EditInput:
path: str = fld("File to edit")
outdated: str = fld("Exact substring to change")
new: str = fld("Replacement textual content")
class EditTool(BaseTool):
title = "edit"
description = "Replace the primary incidence of `outdated` with `new` in a file."
InputModel = EditInput
sort = PermissionSort.WRITE
async def execute(self, args: EditInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"No such file: {args.path}", is_error=True)
textual content = ctx.vfs.learn(args.path)
if args.outdated not in textual content:
return ToolResult(f"`outdated` not present in {args.path}", is_error=True)
ctx.vfs.write(args.path, textual content.change(args.outdated, args.new, 1))
return ToolResult(f"Edited {args.path}: changed 1 incidence.")
@dataclass
class ListFilesInput:
sample: str = fld("Glob sample", default="*")
class ListFilesTool(BaseTool):
title = "list_files"
description = "List information matching a glob sample."
InputModel = ListFilesInput
sort = PermissionSort.READ
async def execute(self, args: ListFilesInput, ctx) -> ToolResult:
information = ctx.vfs.checklist(args.sample)
return ToolResult("n".be a part of(information) if information else "(no matches)")
@dataclass
class GrepInput:
sample: str = fld("Regex to seek for")
path_glob: str = fld("Which information to search", default="*")
class GrepTool(BaseTool):
title = "grep"
description = "Search file contents with a daily expression."
InputModel = GrepInput
sort = PermissionSort.READ
async def execute(self, args: GrepInput, ctx) -> ToolResult:
rx = re.compile(args.sample)
hits = []
for p in ctx.vfs.checklist(args.path_glob):
for i, line in enumerate(ctx.vfs.learn(p).splitlines(), 1):
if rx.search(line):
hits.append(f"{p}:{i}: {line.strip()}")
return ToolResult("n".be a part of(hits) if hits else "(no matches)")
@dataclass
class RunPythonInput:
information: checklist = fld("VFS information to exec so as in a single namespace",
default_factory=checklist)
code: str = fld("Extra Python code to run after the information", default="")
class RunPythonTool(BaseTool):
title = "run_python"
description = ("Execute Python from the digital filesystem (and/or an inline "
"snippet) and seize stdout. Used to run assessments.")
InputModel = RunPythonInput
sort = PermissionSort.EXECUTE
async def execute(self, args: RunPythonInput, ctx) -> ToolResult:
source_parts = []
for p in args.information:
if not ctx.vfs.exists(p):
return ToolResult(f"No such file: {p}", is_error=True)
source_parts.append(ctx.vfs.learn(p))
if args.code:
source_parts.append(args.code)
supply = "nn".be a part of(source_parts)
buf = io.StringIO()
sandbox_globals = {"__name__": "__main__", "__builtins__": __builtins__}
attempt:
with contextlib.redirect_stdout(buf):
exec(compile(supply, "<agent_code>", "exec"), sandbox_globals)
besides Exception as e:
frames = [f for f in traceback.extract_tb(e.__traceback__)
if f.filename == "<agent_code>"]
loc = ""
if frames:
final = frames[-1]
src_lines = supply.splitlines()
textual content = (src_lines[last.lineno - 1].strip()
if 0 < final.lineno <= len(src_lines) else "")
loc = f" (line {final.lineno}: {textual content})" if textual content else f" (line {final.lineno})"
out = buf.getvalue()
msg = f"{sort(e).__name__}: {e}{loc}"
return ToolResult(f"{out}n{msg}".strip(), is_error=True)
return ToolResult(buf.getvalue().strip() or "(ran with no output)")
@dataclass
class ShellInput:
command: str = fld("Shell command to run")
class ShellTool(BaseTool):
title = "shell"
description = "Run a shell command (simulated over the digital filesystem)."
InputModel = ShellInput
sort = PermissionSort.EXECUTE
async def execute(self, args: ShellInput, ctx) -> ToolResult:
cmd = args.command.strip()
if cmd.startswith("ls"):
return ToolResult("n".be a part of(ctx.vfs.checklist("*")) or "(empty)")
if cmd.startswith("cat "):
p = cmd[4:].strip()
if ctx.vfs.exists(p):
return ToolResult(ctx.vfs.learn(p))
return ToolResult(f"cat: {p}: No such file", is_error=True)
if cmd.startswith("echo "):
return ToolResult(cmd[5:])
return ToolResult(f"(simulated) `{cmd}` executed.")
_FAKE_WEB = {
"vector database":
"Vector databases (FAISS, Milvus, pgvector) index embeddings for "
"approximate nearest-neighbour search powering RAG.",
"agent harness":
"An agent harness is the infrastructure round an LLM: instruments, reminiscence, "
"permissions, and a loop that turns mannequin output into actual actions.",
"exponential backoff":
"Exponential backoff retries failed calls after 2^n rising delays to "
"keep away from hammering a struggling service.",
}
@dataclass
class WebSearchInput:
question: str = fld("Search question")
class WebSearchTool(BaseTool):
title = "web_search"
description = "Search the online for up-to-date data. (Mocked in tutorial.)"
InputModel = WebSearchInput
sort = PermissionSort.READ
async def execute(self, args: WebSearchInput, ctx) -> ToolResult:
await asyncio.sleep(0.05)
q = args.question.decrease()
for key, val in _FAKE_WEB.gadgets():
if key in q:
return ToolResult(f"[mock result] {val}")
return ToolResult(f"[mock result] No canned reply for '{args.question}'. "
f"(Wire an actual search API right here.)")
@dataclass
class SkillInput:
title: str = fld("Name of the ability to load")
class SkillTool(BaseTool):
title = "ability"
description = ("Load an on-demand ability (markdown playbook) into context. "
"Only load a ability proper earlier than you want its steering.")
InputModel = SkillInput
sort = PermissionSort.META
async def execute(self, args: SkillInput, ctx) -> ToolResult:
physique = ctx.abilities.load(args.title)
if physique is None:
return ToolResult(f"Unknown ability: {args.title}", is_error=True)
return ToolResult(f"Loaded ability '{args.title}':nn{physique}")
@dataclass
class RememberInput:
observe: str = fld("A sturdy reality/choice to keep in mind throughout periods")
class RememberTool(BaseTool):
title = "keep in mind"
description = "Persist a sturdy reality to long-term reminiscence (MEMORY.md)."
InputModel = RememberInput
sort = PermissionSort.META
async def execute(self, args: RememberInput, ctx) -> ToolResult:
ctx.reminiscence.append(args.observe)
return ToolResult(f"Remembered: {args.observe}")
@dataclass
class AskUserInput:
query: str = fld("A query to ask the human")
class AskUserTool(BaseTool):
title = "ask_user"
description = "Ask the human a clarifying query."
InputModel = AskUserInput
sort = PermissionSort.META
async def execute(self, args: AskUserInput, ctx) -> ToolResult:
canned = getattr(ctx, "canned_answers", {})
ans = canned.get(args.query, "(no reply configured)")
return ToolResult(f"User answered: {ans}")
@dataclass
class SpawnAgentInput:
position: str = fld("Which agent profile to spawn (e.g. 'researcher')")
job: str = fld("The job to delegate to that agent")
class SpawnAgentTool(BaseTool):
title = "spawn_agent"
description = ("Delegate a subtask to a specialised subagent and get its "
"remaining outcome. Multiple spawns in a single flip run in parallel.")
InputModel = SpawnAgentInput
sort = PermissionSort.META
async def execute(self, args: SpawnAgentInput, ctx) -> ToolResult:
outcome = await ctx.spawn(args.position, args.job)
return ToolResult(f"[{args.role}] {outcome}")
@dataclass
class Skill:
title: str
description: str
physique: str
class SkillLibrary:
"""Holds abilities. The *abstract* (title + description) is injected into the
system immediate; the *physique* is barely pulled in when the mannequin hundreds it."""
def __init__(self):
self._skills: dict = {}
self.loaded: checklist = []
@staticmethod
def parse_markdown(md: str) -> Skill:
"""Parse a ability .md with YAML-ish frontmatter (title/description)."""
title, desc, physique = "unnamed", "", md
m = re.match(r"^---s*n(.*?)n---s*n(.*)$", md, re.S)
if m:
entrance, physique = m.group(1), m.group(2)
for line in entrance.splitlines():
if ":" in line:
okay, v = line.cut up(":", 1)
if okay.strip() == "title":
title = v.strip()
elif okay.strip() == "description":
desc = v.strip()
return Skill(title=title, description=desc, physique=physique.strip())
def add_markdown(self, md: str) -> "SkillLibrary":
s = self.parse_markdown(md)
self._skills[s.name] = s
return self
def abstract(self) -> str:
if not self._skills:
return "(none)"
return "n".be a part of(f"- {s.title}: {s.description}"
for s in self._skills.values())
def load(self, title: str) -> str | None:
s = self._skills.get(title)
if s is None:
return None
if title not in self.loaded:
self.loaded.append(title)
return s.physique
class MemoryRetailer:
def __init__(self, path: str):
self.path = path
def learn(self) -> str:
if os.path.exists(self.path):
with open(self.path, "r", encoding="utf-8") as fh:
return fh.learn().strip()
return ""
def append(self, observe: str) -> None:
with open(self.path, "a", encoding="utf-8") as fh:
fh.write(f"- {observe}n")
def reset(self) -> None:
if os.path.exists(self.path):
os.take away(self.path)
We construct the sensible software layer that permits the harness to learn information, write information, edit content material, checklist information, search textual content, run Python code, simulate shell instructions, search mock internet information, load abilities, keep in mind notes, ask the person, and spawn subagents. We outline every software with typed inputs, descriptions, permissions, and executable conduct so the agent can work together with its surroundings in a structured means. We additionally add the ability library and persistent reminiscence retailer, which assist the agent load data on demand and protect helpful data throughout periods.
Defining the Model Brain Layer
class TransientLLMError(Exception):
"""A retryable error (timeout / 5xx / rate-limit)."""
class LLMBrain:
"""Interface: given the transcript + software schemas, produce the subsequent flip."""
mannequin = "summary"
async def stream(self, messages: checklist, instruments: checklist, on_event=None) -> AssistantTurn:
increase NotImplementedError
def Say(textual content: str) -> dict:
return {"remaining": True, "textual content": textual content, "calls": []}
def Use(thought: str, calls: checklist) -> dict:
return {"remaining": False, "textual content": thought, "calls": calls}
def last_tool_results(messages: checklist) -> checklist:
"""The software outcomes produced for the reason that final assistant flip."""
out = []
for m in reversed(messages):
if m.position == "software":
out.append({"title": m.title, "content material": m.content material})
elif m.position == "assistant":
break
return checklist(reversed(out))
class ScriptedBrain(LLMBrain):
mannequin = "mock-sonnet"
def __init__(self, script: checklist, title: str = "mock-sonnet"):
self.script = script
self.i = 0
self.mannequin = title
async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
if self.i >= len(self.script):
return AssistantTurn(textual content="(script exhausted)", stop_reason="end_turn")
step = self.script[self.i]
self.i += 1
motion = step(messages) if callable(step) else step
textual content = motion["text"]
if on_event and textual content:
on_event("textual content", textual content)
calls = []
for j, (tname, targs) in enumerate(motion["calls"]):
tc = ToolName(id=f"call_{self.i}_{j}", title=tname, arguments=targs)
calls.append(tc)
if on_event:
on_event("tool_call", tc)
in_tok = sum(count_tokens(m.content material) for m in messages) + 200
out_tok = count_tokens(textual content) + sum(count_tokens(json.dumps(c.arguments))
for c in calls)
return AssistantTurn(
textual content=textual content,
tool_calls=calls,
stop_reason="tool_use" if calls else "end_turn",
utilization=Usage(in_tok, out_tok),
)
class FlakyBrain(LLMBrain):
"""Wraps a mind and fails the primary `fail_times` calls with a transient
error, so we are able to watch the engine retry with exponential backoff."""
def __init__(self, internal: LLMBrain, fail_times: int = 1):
self.internal = internal
self.failures_left = fail_times
self.mannequin = internal.mannequin
async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
if self.failures_left > 0:
self.failures_left -= 1
increase TransientLLMError("simulated 503 from supplier")
return await self.internal.stream(messages, instruments, on_event)
class Remaking an attemptBrain(LLMBrain):
def __init__(self, internal: LLMBrain, retries: int = 4, base_delay: float = 0.05):
self.internal = internal
self.retries = retries
self.base_delay = base_delay
self.mannequin = internal.mannequin
async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
try = 0
whereas True:
attempt:
return await self.internal.stream(messages, instruments, on_event)
besides TransientLLMError as e:
if try >= self.retries:
increase
delay = self.base_delay * (2 ** try)
print(f"
transient error ({e}); "
f"retry {try + 1}/{self.retries} in {delay:.2f}s")
await asyncio.sleep(delay)
try += 1
class RealLLMBrain(LLMBrain):
def __init__(self, api_format: str, mannequin: str, api_key: str,
base_url: str, system: str = ""):
self.api_format = api_format
self.mannequin = mannequin
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.system = system
def _post(self, url: str, headers: dict, payload: dict) -> dict:
information = json.dumps(payload).encode()
req = urllib.request.Request(url, information=information, headers=headers, methodology="POST")
attempt:
with urllib.request.urlopen(req, timeout=60) as resp:
return json.hundreds(resp.learn().decode())
besides urllib.error.HTTPError as e:
if e.code in (429, 500, 502, 503, 504):
increase TransientLLMError(f"HTTP {e.code}")
increase
besides urllib.error.URLError as e:
increase TransientLLMError(str(e))
def _tools_anthropic(self, instruments: checklist) -> checklist:
return [{"name": t["name"], "description": t["description"],
"input_schema": t["input_schema"]} for t in instruments]
def _tools_openai(self, instruments: checklist) -> checklist:
return [{"type": "function", "function": {
"name": t["name"], "description": t["description"],
"parameters": t["input_schema"]}} for t in instruments]
def _msgs_anthropic(self, messages: checklist) -> checklist:
out = []
for m in messages:
if m.position == "person":
out.append({"position": "person", "content material": m.content material})
elif m.position == "assistant":
blocks = []
if m.content material:
blocks.append({"sort": "textual content", "textual content": m.content material})
for tc in m.tool_calls:
blocks.append({"sort": "tool_use", "id": tc.id,
"title": tc.title, "enter": tc.arguments})
out.append({"position": "assistant", "content material": blocks or m.content material})
elif m.position == "software":
out.append({"position": "person", "content material": [
{"type": "tool_result", "tool_use_id": m.tool_call_id,
"content": m.content}]})
return out
def _msgs_openai(self, messages: checklist) -> checklist:
out = []
if self.system:
out.append({"position": "system", "content material": self.system})
for m in messages:
if m.position == "person":
out.append({"position": "person", "content material": m.content material})
elif m.position == "assistant":
msg = {"position": "assistant", "content material": m.content material or None}
if m.tool_calls:
msg["tool_calls"] = [{
"id": tc.id, "type": "function",
"function": {"name": tc.name,
"arguments": json.dumps(tc.arguments)}}
for tc in m.tool_calls]
out.append(msg)
elif m.position == "software":
out.append({"position": "software", "tool_call_id": m.tool_call_id,
"content material": m.content material})
return out
async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
loop = asyncio.get_event_loop()
if self.api_format == "anthropic":
payload = {"mannequin": self.mannequin, "max_tokens": 1024,
"system": self.system,
"messages": self._msgs_anthropic(messages),
"instruments": self._tools_anthropic(instruments)}
headers = {"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"content-type": "utility/json"}
url = f"{self.base_url}/v1/messages"
information = await loop.run_in_executor(None, self._post, url, headers, payload)
textual content, calls = "", []
for block in information.get("content material", []):
if block.get("sort") == "textual content":
textual content += block["text"]
elif block.get("sort") == "tool_use":
calls.append(ToolName(block["id"], block["name"],
block.get("enter", {})))
u = information.get("utilization", {})
utilization = Usage(u.get("input_tokens", 0), u.get("output_tokens", 0))
cease = "tool_use" if calls else "end_turn"
else:
payload = {"mannequin": self.mannequin,
"messages": self._msgs_openai(messages),
"instruments": self._tools_openai(instruments)}
headers = {"Authorization": f"Bearer {self.api_key}",
"Content-Type": "utility/json"}
url = f"{self.base_url}/chat/completions"
information = await loop.run_in_executor(None, self._post, url, headers, payload)
selection = information["choices"][0]["message"]
textual content = selection.get("content material") or ""
calls = []
for tc in (selection.get("tool_calls") or []):
fn = tc["function"]
attempt:
cargs = json.hundreds(fn.get("arguments") or "{}")
besides json.JSONDecodeError:
cargs = {}
calls.append(ToolName(tc["id"], fn["name"], cargs))
u = information.get("utilization", {})
utilization = Usage(u.get("prompt_tokens", 0), u.get("completion_tokens", 0))
cease = "tool_use" if calls else "end_turn"
if on_event and textual content:
on_event("textual content", textual content)
for c in calls:
if on_event:
on_event("tool_call", c)
return AssistantTurn(textual content=textual content, tool_calls=calls,
stop_reason=cease, utilization=utilization)
def make_real_brain(system: str = "") -> LLMBrain | None:
"""Build an actual mind from env vars, or return None to fall again to mock."""
if os.environ.get("USE_REAL_LLM", "0") not in ("1", "true", "True"):
return None
if os.environ.get("ANTHROPIC_API_KEY"):
return RealLLMBrain(
api_format="anthropic",
mannequin=os.environ.get("MODEL", "claude-sonnet-4-6"),
api_key=os.environ["ANTHROPIC_API_KEY"],
base_url=os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com"),
system=system)
if os.environ.get("OPENAI_API_KEY"):
return RealLLMBrain(
api_format="openai",
mannequin=os.environ.get("MODEL", "gpt-4.1"),
api_key=os.environ["OPENAI_API_KEY"],
base_url=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1"),
system=system)
return None
We outline the mannequin mind layer that decides what the agent does subsequent within the loop. We create a scripted mock mind for deterministic execution, a flaky mind to simulate supplier errors, a retrying wrapper with exponential backoff, and an actual supplier mind for Anthropic- or OpenAI-compatible APIs. We use this snippet to present that the harness stays the identical whereas the intelligence layer can swap between mock execution and actual LLM calls.
Assembling the System Prompt and the QueryEngine Agent Loop
def assemble_system_prompt(*, base: str, project_context: str, reminiscence: str,
skills_summary: str, tool_names: checklist) -> str:
"""Mirror OpenHarness immediate meeting: base + CLAUDE.md + MEMORY.md +
on-demand ability checklist + out there instruments."""
components = [base.strip()]
if project_context:
components.append(f"## Project context (CLAUDE.md)n{project_context.strip()}")
if reminiscence:
components.append(f"## Long-term reminiscence (MEMORY.md)n{reminiscence.strip()}")
if skills_summary and skills_summary != "(none)":
components.append("## Available abilities (load on demand with the `ability` software)n"
+ skills_summary)
components.append("## Available toolsn" + ", ".be a part of(tool_names))
return "nn".be a part of(components)
def estimate_messages_tokens(messages: checklist) -> int:
return sum(count_tokens(m.content material) +
sum(count_tokens(json.dumps(c.arguments)) for c in m.tool_calls)
for m in messages)
def maybe_compact(messages: checklist, *, max_tokens: int, keep_last: int = 4,
verbose: bool = True) -> checklist:
"""Auto-compaction: when the transcript grows previous `max_tokens`, summarize
the older center into one observe whereas preserving the unique job and the
most up-to-date turns. Real OpenHarness asks the mannequin to write the abstract and
additionally preserves job state + channel logs; we use a heuristic right here."""
tok = estimate_messages_tokens(messages)
if tok <= max_tokens or len(messages) <= keep_last + 1:
return messages
first = messages[0]
tail = messages[-keep_last:]
center = messages[1:-keep_last]
info = []
for m in center:
if m.position == "software" and not m.content material.startswith("ERROR"):
info.append(f"{m.title}: {quick(m.content material, 80)}")
elif m.position == "assistant" and m.content material:
info.append(f"assistant: {quick(m.content material, 80)}")
abstract = Message(
position="system",
content material=("[Auto-compacted context] Earlier "
f"{len(center)} messages summarized. Key outcomes:n - "
+ "n - ".be a part of(info[-8:])),
)
compacted = [first, summary] + tail
if verbose:
print(f"
auto-compaction: {len(messages)} msgs "
f"(~{tok} tok) -> {len(compacted)} msgs "
f"(~{estimate_messages_tokens(compacted)} tok)")
return compacted
def console_printer(occasion: str, information) -> None:
if occasion == "textual content" and information:
print(f"
{quick(information, 400)}")
elif occasion == "tool_call":
print(f" ↳ name {information.title}({quick(json.dumps(information.arguments), 120)})")
elif occasion == "tool_result":
flag = "✗" if information.get("is_error") else "✓"
print(f" {flag} {quick(information['output'], 200)}")
class QueryEngine:
def __init__(self, *, mind: LLMBrain, registry: ToolRegistry,
ctx: ToolContext, perms: PermissionChecker, hooks: HookManager,
system_prompt: str, price: CostMeter | None = None,
approve=auto_approve, max_turns: int = 12,
compact_at_tokens: int | None = None):
self.mind = mind
self.registry = registry
self.ctx = ctx
self.perms = perms
self.hooks = hooks
self.system_prompt = system_prompt
self.price = price or CostMeter(getattr(mind, "mannequin", "default"))
self.approve = approve
self.max_turns = max_turns
self.compact_at_tokens = compact_at_tokens
async def execute_tool(self, name: ToolName, on_event=None) -> ToolResult:
software = self.registry.get(name.title)
if software is None:
return ToolResult(f"Unknown software: {name.title}", is_error=True)
resolution = self.perms.test(software, name.arguments)
if resolution.motion == "deny":
r = ToolResult(f"DENIED by permissions: {resolution.purpose}", is_error=True)
if on_event:
on_event("tool_result", {"output": r.output, "is_error": True})
return r
if resolution.motion == "ask":
okay = await self.approve(software, name.arguments, resolution.purpose)
if not okay:
r = ToolResult("DENIED by person.", is_error=True)
if on_event:
on_event("tool_result", {"output": r.output, "is_error": True})
return r
pre = self.hooks.run_pre(name, software, self.ctx)
if pre.blocked:
r = ToolResult(f"BLOCKED by hook: {pre.purpose}", is_error=True)
if on_event:
on_event("tool_result", {"output": r.output, "is_error": True})
return r
args = pre.arguments
attempt:
outcome = await software.run(args, self.ctx)
besides Exception as e:
outcome = ToolResult(f"Tool raised: {e}", is_error=True)
outcome = self.hooks.run_post(name, software, outcome, self.ctx)
if on_event:
on_event("tool_result", {"output": outcome.output,
"is_error": outcome.is_error})
return outcome
async def run(self, job: str, on_event=console_printer) -> str:
messages: checklist = [Message(role="user", content=task)]
for flip in vary(self.max_turns):
if self.compact_at_tokens:
messages = maybe_compact(messages, max_tokens=self.compact_at_tokens)
assistant = await self.mind.stream(messages, self.registry.schemas(),
on_event)
self.price.add(assistant.utilization)
if assistant.stop_reason != "tool_use" or not assistant.tool_calls:
return assistant.textual content
messages.append(Message(position="assistant", content material=assistant.textual content,
tool_calls=assistant.tool_calls))
outcomes = await asyncio.collect(
*[self.execute_tool(tc, on_event) for tc in assistant.tool_calls]
)
for tc, res in zip(assistant.tool_calls, outcomes):
content material = res.output
if res.is_error and not content material.startswith(("DENIED", "BLOCKED")):
content material = "ERROR: " + content material
elif res.is_error:
content material = "ERROR: " + content material
messages.append(Message(position="software", title=tc.title,
tool_call_id=tc.id, content material=content material))
return "(stopped: reached max_turns)"
DEFAULT_TOOLS = [
WriteFileTool, ReadFileTool, EditTool, ListFilesTool, GrepTool,
RunPythonTool, ShellTool, WebSearchTool, SkillTool, RememberTool,
AskUserTool, SpawnAgentTool,
]
def build_registry(tool_classes=None) -> ToolRegistry:
reg = ToolRegistry()
for cls in (tool_classes or DEFAULT_TOOLS):
reg.register(cls())
return reg
BASE_SYSTEM = (
"You are an autonomous coding/analysis agent operating inside OpenHarness. "
"Use instruments to take actual actions. Think step-by-step, confirm your work by "
"operating it, and solely cease when the duty is really full."
)
We assemble the system immediate, estimate transcript measurement, compact lengthy conversations, print streaming occasions, and outline the primary QueryEngine. We make the engine accountable for asking the mind what to do, checking permissions, operating hooks, executing instruments, gathering outcomes, and looping till the duty is completed. We additionally register the default instruments and create the bottom system instruction that guides the agent towards verified and tool-based work.
Running the Agent Loop, Permission Governance, and On-Demand Skills Demos
async def demo_agent_loop():
clarify(
"DEMO 1 — The Agent Loop (write code → run → see it fail → repair → cross)",
"""This is the centerpiece. A scripted 'mind' drives the actual harness:
every flip it emits software calls, the engine runs them via permissions +
hooks, feeds outcomes again, and the mind REACTS. It writes a factorial
module with an off-by-one bug, writes a take a look at, runs it, sees the failure
within the software outcome, fixes the bug, re-runs to inexperienced, and saves a observe to
reminiscence. We additionally wrap the mind in retry/backoff and a one-time transient
failure so you may watch restoration — and we observe token price all through.""")
buggy = (
"def factorial(n):n"
" outcome = 1n"
" for i in vary(1, n):n"
" outcome *= in"
" return resultn"
)
test_code = (
'assert factorial(0) == 1, "0! must be 1"n'
'assert factorial(5) == 120, "5! must be 120 however bought " + str(factorial(5))n'
'print("All assessments handed: 0!=1 and 5!=120")n'
)
def reactive_fix(messages):
outs = last_tool_results(messages)
failed = any("ERROR" in o["content"] or "Assert" in o["content"]
for o in outs)
if failed:
return Use("The take a look at failed — basic off-by-one within the loop vary. "
"vary(1, n) stops at n-1, so I'll make it vary(1, n + 1).",
[("edit", {"path": "mathutils.py",
"old": "range(1, n)",
"new": "range(1, n + 1)"})])
return Use("Tests already cross; re-running to affirm.",
[("run_python", {"files": ["mathutils.py", "test_math.py"]})])
script = [
Use("First I'll create the factorial module.",
[("write_file", {"path": "mathutils.py", "content": buggy})]),
Use("Now a take a look at that pins down the anticipated conduct.",
[("write_file", {"path": "test_math.py", "content": test_code})]),
Use("Let me run the take a look at to see if it really works.",
[("run_python", {"files": ["mathutils.py", "test_math.py"]})]),
reactive_fix,
Use("Re-running the take a look at after the repair.",
[("run_python", {"files": ["mathutils.py", "test_math.py"]})]),
Use("I'll keep in mind the lesson for subsequent time.",
[("remember",
{"note": "factorial(n): loop must be range(1, n+1); "
"range(1, n) gives n!/n (off-by-one)."})]),
lambda m: Say("Done
Created mathutils.py with factorial(), wrote a "
"take a look at, caught an off-by-one bug (vary(1, n) → "
"vary(1, n + 1)), fastened it, and the take a look at now passes. "
"Saved the lesson to reminiscence."),
]
vfs = VirtualFS()
reminiscence = MemoryRetailer(os.path.be a part of(tempfile.gettempdir(), "oh_demo1_mem.md"))
reminiscence.reset()
abilities = SkillLibrary()
registry = build_registry()
ctx = ToolContext(vfs=vfs, reminiscence=reminiscence, abilities=abilities)
hooks = HookManager()
hooks.add_pre(lambda name, software, c:
print(f"
pre-hook noticed {software.title}") or None)
mind = Remaking an attemptBrain(FlakyBrain(ScriptedBrain(script), fail_times=1))
price = CostMeter("mock-sonnet")
engine = QueryEngine(
mind=mind, registry=registry, ctx=ctx,
perms=PermissionChecker(PermissionMode.DEFAULT),
hooks=hooks,
system_prompt=assemble_system_prompt(
base=BASE_SYSTEM, project_context="", reminiscence=reminiscence.learn(),
skills_summary=abilities.abstract(), tool_names=registry.names()),
price=price, approve=auto_approve, max_turns=10,
)
print("n[running the agent loop]n")
remaining = await engine.run("Implement factorial() with a take a look at; make assessments cross.")
print(f"n FINAL ANSWER:n {remaining}")
print(f"n Virtual filesystem now incorporates:n{vfs.tree()}")
print(f"n Final mathutils.py:n" +
textwrap.indent(vfs.learn("mathutils.py"), " "))
print(f"n
price: {price.abstract()}")
print("n TAKEAWAY: the mannequin solely decides WHAT to do; the harness handles "
"HOW — permissions, hooks, execution, retries, outcomes, and price.")
async def demo_permissions():
clarify(
"DEMO 2 — Governance: permission modes, path guidelines, denied instructions",
"""The harness is the security boundary. The identical write_file software behaves
otherwise below default/auto/plan modes; delicate paths are all the time
denied; path guidelines and denied-command patterns add fine-grained management;
and PreToolUse hooks can veto a name outright.""")
write = WriteFileTool()
shell = ShellTool()
print("n (a) Same write below every mode:")
for mode in (PermissionMode.DEFAULT, PermissionMode.AUTO, PermissionMode.PLAN):
d = PermissionChecker(mode).test(write, {"path": "notes.txt", "content material": "x"})
print(f" {mode.worth:8s} -> {d.motion.higher():5s} ({d.purpose})")
print("n (b) Built-in sensitive-path safety (any mode):")
for p in ["app/main.py", ".env", "~/.ssh/id_rsa", "secrets/key.txt"]:
d = PermissionChecker(PermissionMode.AUTO).test(write, {"path": p, "content material": "x"})
print(f" write {p:18s} -> {d.motion.higher():5s} ({d.purpose})")
print("n (c) Custom path_rules + denied_commands:")
laptop = PermissionChecker(
PermissionMode.AUTO,
path_rules=[{"pattern": "build/*", "allow": False}],
denied_commands=[r"rms+-rfs+/", r"DROPs+TABLE"])
print(" write construct/x ->",
laptop.test(write, {"path": "construct/x", "content material": "x"}).motion.higher())
print(" shell 'rm -rf /' ->",
laptop.test(shell, {"command": "rm -rf /"}).motion.higher())
print(" shell 'ls -la' ->",
laptop.test(shell, {"command": "ls -la"}).motion.higher())
print("n (d) A PreToolUse safety hook that vetoes writing credentials:")
def security_hook(name, software, ctx):
if software.title == "write_file" and "password" in name.arguments.get("content material", "").decrease():
return HookOutcome(blocked=True, purpose="content material appears like a secret")
return None
hooks = HookManager().add_pre(security_hook)
ctx = ToolContext(vfs=VirtualFS())
engine = QueryEngine(mind=ScriptedBrain([]), registry=build_registry(),
ctx=ctx, perms=PermissionChecker(PermissionMode.AUTO),
hooks=hooks, system_prompt="", approve=auto_approve)
r = await engine.execute_tool(
ToolName("c1", "write_file",
{"path": "config.txt", "content material": "password=hunter2"}),
on_event=console_printer)
print(f" outcome -> {r.output}")
print("n TAKEAWAY: permissions + hooks kind layered protection round each "
"single software name, impartial of how intelligent the mannequin is.")
async def demo_skills():
clarify(
"DEMO 3 — Skills: on-demand data loading",
"""Skills are markdown playbooks. Only their title+description sit within the
system immediate (low cost); the complete physique is pulled in ONLY when the mannequin
decides it wants it, through the `ability` software. This retains context small whereas
giving the agent deep, swappable experience. Compatible in spirit with
anthropics/abilities (drop a .md file in and it simply works).""")
abilities = SkillLibrary()
abilities.add_markdown(textwrap.dedent("""
---
title: commit
description: Create clear, typical git commits
---
# Commit ability
2. Group associated adjustments; write a Conventional Commit topic
(feat:, repair:, docs:, refactor:, take a look at:).
3. Keep the topic <= 50 chars; clarify *why* within the physique.
"""))
abilities.add_markdown(textwrap.dedent("""
---
title: evaluate
description: Review code for bugs, safety, and readability
---
# Review ability
Check: correctness, edge instances, enter validation, error dealing with,
secrets and techniques in code, naming, and take a look at protection. Be particular and sort.
"""))
registry = build_registry()
ctx = ToolContext(vfs=VirtualFS(), abilities=abilities,
reminiscence=MemoryRetailer(os.path.be a part of(tempfile.gettempdir(),
"oh_d3.md")))
sysprompt = assemble_system_prompt(
base=BASE_SYSTEM, project_context="", reminiscence="",
skills_summary=abilities.abstract(), tool_names=registry.names())
print(" System immediate's abilities part (names+descriptions solely):")
print(textwrap.indent(abilities.abstract(), " "))
print(f"n Loaded ability our bodies to date: {abilities.loaded} (none but!)")
script = [
Use("This is a commit task — I'll load the commit skill first.",
[("skill", {"name": "commit"})]),
lambda m: Say("Following the commit playbook I simply loaded, I'd stage the "
"associated adjustments and write: 'repair: right factorial "
"off-by-one in vary sure'."),
]
engine = QueryEngine(mind=ScriptedBrain(script), registry=registry, ctx=ctx,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt=sysprompt)
print("n[running]n")
remaining = await engine.run("Help me commit my adjustments correctly.")
print(f"n FINAL: {remaining}")
print(f"n Loaded ability our bodies now: {abilities.loaded} (loaded on demand)")
print("n TAKEAWAY: data is paged in just-in-time, not crammed into "
"each immediate.")
We implement the primary three demos to present the harness in motion. We first run a whole agent loop the place the system writes buggy factorial code, assessments it, detects the failure, fixes the bug, reruns the take a look at, and saves the lesson to reminiscence. We then discover permission governance and on-demand abilities to see how security controls and reusable playbooks enhance the agent workflow.
Adding Memory, Context Compaction, and Multi-Agent Coordination
async def demo_memory():
clarify(
"DEMO 4 — Memory: persistent MEMORY.md throughout periods",
"""Long-term reminiscence survives between runs by persisting to MEMORY.md. In
session 1 the agent data a person choice; in a brand-new session 2
(contemporary engine, contemporary transcript) that reminiscence is injected into the system
immediate, so the agent already 'is aware of' the person.""")
mem_path = os.path.be a part of(tempfile.gettempdir(), "oh_demo4_MEMORY.md")
reminiscence = MemoryRetailer(mem_path)
reminiscence.reset()
registry = build_registry()
print(" ── Session 1 ──")
ctx1 = ToolContext(vfs=VirtualFS(), reminiscence=reminiscence, abilities=SkillLibrary())
s1 = [
Use("I'll remember the user's stated preferences.",
[("remember", {"note": "User prefers metric units and concise answers."})]),
lambda m: Say("Noted your preferences for subsequent time."),
]
eng1 = QueryEngine(mind=ScriptedBrain(s1), registry=registry, ctx=ctx1,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(),
system_prompt=assemble_system_prompt(
base=BASE_SYSTEM, project_context="",
reminiscence=reminiscence.learn(),
skills_summary="(none)", tool_names=registry.names()))
await eng1.run("Remember that I like metric models and quick solutions.")
print(f" MEMORY.md is now:n{textwrap.indent(reminiscence.learn(), ' ')}")
print("n ── Session 2 (new session, reminiscence reloaded from disk) ──")
memory2 = MemoryRetailer(mem_path)
ctx2 = ToolContext(vfs=VirtualFS(), reminiscence=memory2, abilities=SkillLibrary())
sysprompt2 = assemble_system_prompt(
base=BASE_SYSTEM, project_context="", reminiscence=memory2.learn(),
skills_summary="(none)", tool_names=registry.names())
print(" The new system immediate already incorporates:")
print(textwrap.indent("## Long-term reminiscence (MEMORY.md)n" + memory2.learn(),
" "))
s2 = [lambda m: Say("Since you prefer metric and brevity: it's about 5 km.
")]
eng2 = QueryEngine(mind=ScriptedBrain(s2), registry=registry, ctx=ctx2,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt=sysprompt2)
remaining = await eng2.run("How far is a 5000 meter run, roughly?")
print(f"n FINAL: {remaining}")
print("n TAKEAWAY: state that ought to outlive a dialog goes to reminiscence, "
"then is re-injected at the beginning of future periods.")
async def demo_compaction():
clarify(
"DEMO 5 — Context auto-compaction (multi-day periods with out overflow)",
"""As a session grows, the transcript can blow previous the context window.
Auto-compaction summarizes the older center of the dialog right into a
compact observe whereas preserving the unique job and the latest
turns — so long-running brokers preserve going. (We power a tiny threshold to
set off it; actual OpenHarness asks the mannequin to write the abstract.)""")
msgs = [Message(role="user", content="Build and verify a data pipeline.")]
for i in vary(8):
msgs.append(Message(position="assistant", content material=f"Step {i}: doing work...",
tool_calls=[ToolCall(f"c{i}", "shell",
{"command": f"process chunk {i}"})]))
msgs.append(Message(position="software", title="shell", tool_call_id=f"c{i}",
content material=f"chunk {i} processed: 1000 rows okay " * 4))
earlier than = estimate_messages_tokens(msgs)
print(f" Before: {len(msgs)} messages, ~{earlier than} tokens")
compacted = maybe_compact(msgs, max_tokens=300, keep_last=4)
after = estimate_messages_tokens(compacted)
print(f" After: {len(compacted)} messages, ~{after} tokens "
f"({100 * (earlier than - after) // earlier than}% smaller)")
print("n The injected abstract message:")
print(textwrap.indent(compacted[1].content material, " "))
print("n TAKEAWAY: the harness manages the context window so the agent can "
"run far longer than a single window permits.")
async def demo_multi_agent():
clarify(
"DEMO 6 — Swarm coordination: spawning parallel subagents",
"""A lead agent decomposes a job and delegates to specialised subagents.
Each subagent is its OWN harness (personal loop, personal mind, personal instruments). Two
researchers run IN PARALLEL (issued in the identical flip → asyncio.collect),
then a author synthesizes their findings. The crew registry tracks who
did what.""")
def researcher_profile():
reg = build_registry([WebSearchTool])
script = [
Use("Researching via web search.",
[("web_search", {"query": "PLACEHOLDER"})]),
lambda m: Say("Summary: " +
quick(last_tool_results(m)[0]["content"], 160)),
]
return ScriptedBrain(script), reg
def writer_profile():
reg = build_registry([WriteFileTool])
script = [lambda m: Say("Synthesized brief combining both research notes "
"into a coherent paragraph.")]
return ScriptedBrain(script), reg
profiles = {"researcher": researcher_profile, "author": writer_profile}
vfs = VirtualFS()
reminiscence = MemoryRetailer(os.path.be a part of(tempfile.gettempdir(), "oh_d6.md"))
abilities = SkillLibrary()
crew: checklist = []
def make_spawn():
async def spawn(position: str, job: str) -> str:
manufacturing facility = profiles.get(position)
if not manufacturing facility:
return f"(no such position: {position})"
child_brain, child_reg = manufacturing facility()
if position == "researcher" and child_brain.script:
child_brain.script[0] = Use(f"Researching: {job}",
[("web_search", {"query": task})])
child_ctx = ToolContext(vfs=vfs, reminiscence=reminiscence, abilities=abilities,
spawn=spawn)
child_engine = QueryEngine(
mind=child_brain, registry=child_reg, ctx=child_ctx,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt="(subagent)",
approve=auto_approve, max_turns=6)
print(f"
spawned [{role}] for: {quick(job, 60)}")
outcome = await child_engine.run(job, on_event=None)
crew.append({"position": position, "job": job, "outcome": outcome})
return outcome
return spawn
ctx = ToolContext(vfs=vfs, reminiscence=reminiscence, abilities=abilities, spawn=make_spawn())
registry = build_registry()
lead_script = [
Use("I'll split this: research vector databases AND agent harnesses in "
"parallel, then have a writer combine the findings.",
[("spawn_agent", {"role": "researcher",
"task": "vector database for RAG"}),
("spawn_agent", {"role": "researcher",
"task": "agent harness design"})]),
Use("Both analysis notes are in — delegating synthesis to the author.",
[("spawn_agent", {"role": "writer",
"task": "combine the two research notes"})]),
lambda m: Say("Coordination full: 2 researchers (parallel) + 1 "
"author produced a mixed temporary."),
]
engine = QueryEngine(mind=ScriptedBrain(lead_script), registry=registry,
ctx=ctx, perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt="(lead agent)",
max_turns=8)
print("n[running the lead agent]n")
t0 = time.time()
remaining = await engine.run("Produce a brief temporary on constructing RAG brokers.")
dt = time.time() - t0
print(f"n FINAL: {remaining}")
print(f"n Team registry ({len(crew)} subagent runs, whole {dt:.3f}s):")
for entry in crew:
print(f" - [{entry['role']}] {quick(entry['task'], 40)} -> "
f"{quick(entry['result'], 80)}")
print("n TAKEAWAY: the identical loop nests — a 'software' could be an complete agent, "
"enabling parallel groups and delegation.")
async def demo_real_provider():
clarify(
"DEMO 7 — Swap in a REAL mannequin (Anthropic / OpenAI-compatible)",
"""Everything above ran on a deterministic mock mind — zero keys, zero
price. Going reside adjustments precisely ONE factor: the mind. The engine, instruments,
permissions, hooks, abilities, reminiscence, and coordinator are untouched. This
is the entire level of a harness: the mannequin is pluggable.""")
print(textwrap.dedent("""
To run the SAME harness on an actual mannequin, set surroundings variables and
re-run (works with any OpenAI- or Anthropic-compatible endpoint that
OpenHarness helps: Claude, GPT, Kimi, GLM, DeepSeek, Qwen, Groq,
Ollama, OpenRouter, ...):
import os
os.environ["USE_REAL_LLM"] = "1"
# --- Anthropic-style ---
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
os.environ["MODEL"] = "claude-sonnet-4-6"
# --- or OpenAI-style (incl. native Ollama) ---
# os.environ["OPENAI_API_KEY"] = "sk-..."
# os.environ["OPENAI_BASE_URL"] = "http://localhost:11434/v1"
# os.environ["MODEL"] = "llama-3.3-70b"
Then construct the engine with the actual mind as a substitute of the mock:
mind = make_real_brain(system=system_prompt) or ScriptedBrain([...])
engine = QueryEngine(mind=mind, registry=registry, ctx=ctx, ...)
await engine.run("Refactor utils.py and add assessments.")
"""))
sysprompt = assemble_system_prompt(
base=BASE_SYSTEM, project_context="", reminiscence="",
skills_summary="(none)", tool_names=build_registry().names())
actual = make_real_brain(system=sysprompt)
if actual is None:
print(" [USE_REAL_LLM not set → staying on the mock brain. "
"Set the env vars above and re-run to go live.]")
return
print(f" [LIVE] Using actual supplier: {actual.api_format} / {actual.mannequin}n")
vfs = VirtualFS()
ctx = ToolContext(vfs=vfs, reminiscence=MemoryRetailer(
os.path.be a part of(tempfile.gettempdir(), "oh_real.md")),
abilities=SkillLibrary(), canned_answers={})
engine = QueryEngine(
mind=Remaking an attemptBrain(actual), registry=build_registry(), ctx=ctx,
perms=PermissionChecker(PermissionMode.AUTO), hooks=HookManager(),
system_prompt=sysprompt, price=CostMeter(actual.mannequin), max_turns=12)
remaining = await engine.run(
"Create greet.py with a operate greet(title) that returns "
"'Hello, <title>!', then write and run a fast take a look at to show it really works.")
print(f"n FINAL: {remaining}")
print(f"n Files:n{vfs.tree()}")
print(f"n
{engine.price.abstract()}")
async def important():
banner("OpenHarness From Scratch — guided walkthrough")
print(textwrap.dedent("""
We will construct up the harness one subsystem at a time:
1. The agent loop (instruments, run/confirm/repair, retries, price)
2. Permissions (modes, delicate paths, guidelines, hook veto)
3. Skills (on-demand data)
4. Memory (persistent MEMORY.md throughout periods)
5. Compaction (surviving lengthy periods)
6. Multi-agent (parallel subagent delegation)
7. Real supplier (one-line swap to a reside mannequin)
Architecture (what every bit is accountable for):
User immediate
│
▼
QueryEngine ──► LLM mind (mock or actual) "WHAT to do"
│ ▲ │ tool_use
│ └────────────┘
▼
For every software name: Permission ─► PreHook ─► Execute ─► PostHook
│ │ │ │
deny/ask veto/edit sandbox redact
│
▼
Tool outcome ──► again into the transcript ──► loop
""").rstrip())
await demo_agent_loop()
await demo_permissions()
await demo_skills()
await demo_memory()
await demo_compaction()
await demo_multi_agent()
await demo_real_provider()
banner("All demos full
")
print(textwrap.dedent("""
You simply constructed the core of an agent harness:
• a streaming tool-call loop with retries & price monitoring
• type-validated, self-describing instruments
• layered governance (permission modes + lifecycle hooks)
• on-demand abilities and persistent reminiscence
• context auto-compaction
• nested multi-agent coordination
• a one-line swap to an actual LLM supplier
To go deeper, research the actual venture: https://github.com/HKUDS/OpenHarness
(43+ instruments, plugin ecosystem, MCP shopper, React/Ink TUI, the `oh` CLI,
and the `ohmo` private agent). "The mannequin is the agent; the code is the
harness."
"""))
run_async(important())
We full the tutorial with reminiscence, context compaction, multi-agent coordination, real-provider switching, and the ultimate guided walkthrough. We show how reminiscence persists throughout periods, how outdated context will get summarized, and how a lead agent delegates work to parallel subagents earlier than synthesizing outcomes. We end by operating all demos via the primary operate, yielding a whole, runnable view of an OpenHarness-style agent system.
Conclusion
In conclusion, we’ve a whole hands-on understanding of how an agent harnesses language-model reasoning with actual, managed actions. We noticed how every subsystem contributes to reliability: instruments make the agent succesful, permissions preserve execution secure, hooks add governance, reminiscence preserves helpful preferences, abilities load data solely when wanted, and compaction helps longer periods keep manageable. We additionally explored how the identical harness can assist deterministic mock brains, actual LLM suppliers, and nested subagents that work collectively on delegated duties.
Check out the Full Codes here. Also, be happy to observe us on Twitter and don’t overlook to be a part of our 150k+ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to accomplice with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us
The submit How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination appeared first on MarkTechPost.
