|

How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination

🔐

In this tutorial, we construct OpenHarness from scratch to higher perceive how a sensible agent harness works. We recreate the main constructing blocks that make an agent system helpful, together with software use, typed software schemas, permissions, lifecycle hooks, reminiscence, abilities, context compaction, retry logic, price monitoring, and multi-agent coordination. Instead of treating an agent framework as a black field, we expose the complete management stream and watch how the harness receives a person job, lets the mannequin determine the subsequent motion, validates and executes software calls, returns observations, and continues the loop till the duty is full. We additionally preserve the implementation runnable so we are able to experiment with the structure without having API keys or advanced infrastructure.

Setting Up the OpenHarness Core

from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import fnmatch
import io
import json
import os
import re
import tempfile
import textwrap
import time
import traceback
import varieties
import typing
import urllib.error
import urllib.request
from dataclasses import dataclass, discipline
from enum import Enum
MISSING = dataclasses.MISSING
UnionType = getattr(varieties, "UnionType", None)
def run_async(coro):
   """Run a coroutine to completion from sync code, even inside a reside loop."""
   attempt:
       loop = asyncio.get_running_loop()
   besides RuntimeError:
       loop = None
   if loop shouldn't be None and loop.is_running():
       attempt:
           import nest_asyncio
           nest_asyncio.apply()
           return loop.run_until_complete(coro)
       besides Exception:
           import threading
           field: dict = {}
           def _runner():
               new_loop = asyncio.new_event_loop()
               attempt:
                   field["value"] = new_loop.run_until_complete(coro)
               lastly:
                   new_loop.shut()
           t = threading.Thread(goal=_runner)
           t.begin()
           t.be a part of()
           return field["value"]
   return asyncio.run(coro)
BANNER = "═" * 78
def banner(title: str) -> None:
   print("n" + BANNER)
   print(f"  {title}")
   print(BANNER)
def clarify(title: str, physique: str) -> None:
   banner(title)
   print(textwrap.fill(textwrap.dedent(physique).strip(), width=78))
   print("-" * 78)
def quick(textual content: str, n: int = 240) -> str:
   textual content = " ".be a part of(str(textual content).cut up())
   return textual content if len(textual content) <= n else textual content[: n - 1] + "…"
@dataclass
class Usage:
   input_tokens: int = 0
   output_tokens: int = 0
   def __add__(self, different: "Usage") -> "Usage":
       return Usage(self.input_tokens + different.input_tokens,
                    self.output_tokens + different.output_tokens)
@dataclass
class ToolName:
   id: str
   title: str
   arguments: dict
@dataclass
class AssistantTurn:
   """One flip produced by the mannequin: some textual content + zero or extra software calls."""
   textual content: str = ""
   tool_calls: checklist = discipline(default_factory=checklist)
   stop_reason: str = "end_turn"
   utilization: Usage = discipline(default_factory=Usage)
@dataclass
class Message:
   """A single message within the operating dialog transcript."""
   position: str
   content material: str = ""
   tool_calls: checklist = discipline(default_factory=checklist)
   tool_call_id: str = ""
   title: str = ""
def count_tokens(textual content: str) -> int:
   """Cheap, provider-agnostic token estimate (~4 chars/token)."""
   if not textual content:
       return 0
   return max(1, spherical(len(textual content) / 4))
PRICE_BOOK = {
   "mock-sonnet":     (3.00, 15.00),
   "claude-sonnet-4": (3.00, 15.00),
   "gpt-4.1":         (2.00,  8.00),
   "default":         (1.00,  3.00),
}
class CostMeter:
   """Accumulates token utilization and converts it to an estimated greenback price."""
   def __init__(self, mannequin: str):
       self.mannequin = mannequin
       self.whole = Usage()
       self.calls = 0
   def add(self, utilization: Usage) -> None:
       self.whole = self.whole + utilization
       self.calls += 1
   @property
   def {dollars}(self) -> float:
       pin, pout = PRICE_BOOK.get(self.mannequin, PRICE_BOOK["default"])
       return (self.whole.input_tokens / 1e6) * pin + 
              (self.whole.output_tokens / 1e6) * pout
   def abstract(self) -> str:
       return (f"{self.calls} mannequin name(s) | "
               f"in={self.whole.input_tokens} out={self.whole.output_tokens} tok | "
               f"~${self.{dollars}:.5f} ({self.mannequin})")
def fld(description: str = "", default=MISSING, default_factory=MISSING):
   """Declare a tool-input discipline with an outline (and elective default)."""
   md = {"description": description}
   if default_factory shouldn't be MISSING:
       return discipline(default_factory=default_factory, metadata=md)
   if default shouldn't be MISSING:
       return discipline(default=default, metadata=md)
   return discipline(metadata=md)
def _is_optional(t) -> bool:
   origin = typing.get_origin(t)
   if origin is typing.Union or (UnionType shouldn't be None and origin is UnionType):
       return sort(None) in typing.get_args(t)
   return False
def _py_to_json_type(t) -> dict:
   origin = typing.get_origin(t)
   if origin is typing.Union or (UnionType shouldn't be None and origin is UnionType):
       args = [a for a in typing.get_args(t) if a is not type(None)]
       return _py_to_json_type(args[0]) if args else {"sort": "string"}
   if t is str:
       return {"sort": "string"}
   if t is bool:
       return {"sort": "boolean"}
   if t is int:
       return {"sort": "integer"}
   if t is float:
       return {"sort": "quantity"}
   if origin is checklist or t is checklist:
       args = typing.get_args(t)
       merchandise = _py_to_json_type(args[0]) if args else {"sort": "string"}
       return {"sort": "array", "gadgets": merchandise}
   if origin is dict or t is dict:
       return {"sort": "object"}
   return {"sort": "string"}
def build_json_schema(model_cls) -> dict:
   """Turn a dataclass enter mannequin right into a JSON Schema (object with properties)."""
   hints = typing.get_type_hints(model_cls)
   props, required = {}, []
   for f in dataclasses.fields(model_cls):
       t = hints.get(f.title, str)
       js = dict(_py_to_json_type(t))
       desc = f.metadata.get("description", "")
       if desc:
           js["description"] = desc
       props[f.name] = js
       has_default = (f.default shouldn't be MISSING) or (f.default_factory shouldn't be MISSING)
       if not has_default and not _is_optional(t):
           required.append(f.title)
   schema = {"sort": "object", "properties": props}
   if required:
       schema["required"] = required
   return schema
def _coerce(v, t):
   origin = typing.get_origin(t)
   if origin is typing.Union or (UnionType shouldn't be None and origin is UnionType):
       if v is None:
           return None
       args = [a for a in typing.get_args(t) if a is not type(None)]
       return _coerce(v, args[0]) if args else v
   if t is str:
       return v if isinstance(v, str) else str(v)
   if t is bool:
       if isinstance(v, bool):
           return v
       if isinstance(v, str):
           return v.strip().decrease() in ("1", "true", "sure", "y", "on")
       return bool(v)
   if t is int:
       return int(v)
   if t is float:
       return float(v)
   if origin is checklist or t is checklist:
       args = typing.get_args(t)
       it = args[0] if args else str
       if not isinstance(v, checklist):
           v = [v]
       return [_coerce(x, it) for x in v]
   if origin is dict or t is dict:
       return dict(v) if v else {}
   return v
def instantiate(model_cls, uncooked: dict):
   """Validate + coerce uncooked JSON args right into a typed enter occasion."""
   hints = typing.get_type_hints(model_cls)
   uncooked = uncooked or {}
   kwargs = {}
   for f in dataclasses.fields(model_cls):
       t = hints.get(f.title, str)
       if f.title in uncooked and uncooked[f.name] shouldn't be None:
           attempt:
               kwargs[f.name] = _coerce(uncooked[f.name], t)
           besides (TypeError, ValueError) as e:
               increase ValueError(f"Bad worth for '{f.title}': {e}")
       elif f.default shouldn't be MISSING:
           kwargs[f.name] = f.default
       elif f.default_factory shouldn't be MISSING:
           kwargs[f.name] = f.default_factory()
       elif _is_optional(t):
           kwargs[f.name] = None
       else:
           increase ValueError(f"Missing required argument '{f.title}'")
   return model_cls(**kwargs)
class PermissionSort(Enum):
   """How harmful a software is — drives the default permission coverage."""
   READ = "learn"
   WRITE = "write"
   EXECUTE = "execute"
   META = "meta"
@dataclass
class ToolResult:
   output: str
   is_error: bool = False
   metadata: dict = discipline(default_factory=dict)
class ToolContext:
   """Everything a software might have at runtime (providers + shared state)."""
   def __init__(self, **providers):
       self.__dict__.replace(providers)
class BaseTool:
   """Base class for all instruments. Subclasses set title/description/InputModel/sort
   and implement `execute`. Schema + validation are dealt with right here."""
   title: str = "base"
   description: str = ""
   InputModel = None
   sort: PermissionSort = PermissionSort.READ
   def schema(self) -> dict:
       return {
           "title": self.title,
           "description": self.description,
           "sort": self.sort.worth,
           "input_schema": (build_json_schema(self.InputModel)
                            if self.InputModel else
                            {"sort": "object", "properties": {}}),
       }
   async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:
       args = instantiate(self.InputModel, raw_args) if self.InputModel else None
       return await self.execute(args, ctx)
   async def execute(self, args, ctx: ToolContext) -> ToolResult:
       increase NotImplementedError
class ToolRegistry:
   def __init__(self):
       self._tools: dict = {}
   def register(self, software: BaseTool) -> "ToolRegistry":
       self._tools[tool.name] = software
       return self
   def get(self, title: str) -> BaseTool | None:
       return self._tools.get(title)
   def schemas(self) -> checklist:
       return [t.schema() for t in self._tools.values()]
   def names(self) -> checklist:
       return checklist(self._tools)
class VirtualFS:
   """In-memory filesystem. Keeps the tutorial secure & deterministic in Colab."""
   def __init__(self):
       self.information: dict = {}
   @staticmethod
   def norm(path: str) -> str:
       return path.lstrip("./").strip()
   def write(self, path: str, content material: str) -> None:
       self.information[self.norm(path)] = content material
   def learn(self, path: str) -> str:
       return self.information[self.norm(path)]
   def exists(self, path: str) -> bool:
       return self.norm(path) in self.information
   def checklist(self, sample: str = "*") -> checklist:
       return sorted(p for p in self.information if fnmatch.fnmatch(p, sample))
   def tree(self) -> str:
       if not self.information:
           return "(empty)"
       return "n".be a part of(f"  {p}  ({len(c)} bytes)"
                        for p, c in sorted(self.information.gadgets()))
class PermissionMode(Enum):
   DEFAULT = "default"
   AUTO = "auto"
   PLAN = "plan"
@dataclass
class PermissionDetermination:
   motion: str
   purpose: str = ""
SENSITIVE_PATTERNS = [
   "/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",
   "*credentials*", "*.env", "*/secrets/*",
]
class PermissionChecker:
   def __init__(self, mode: PermissionMode = PermissionMode.DEFAULT,
                path_rules: checklist | None = None,
                denied_commands: checklist | None = None):
       self.mode = mode
       self.path_rules = path_rules or []
       self.denied_commands = denied_commands or []
   def _check_path(self, path: str) -> PermissionDetermination | None:
       for pat in SENSITIVE_PATTERNS:
           if fnmatch.fnmatch(path, pat):
               return PermissionDetermination("deny", f"delicate path '{path}' ({pat})")
       for rule in self.path_rules:
           if fnmatch.fnmatch(path, rule["pattern"]):
               if rule.get("enable", True):
                   return PermissionDetermination("enable", f"path rule permits '{rule['pattern']}'")
               return PermissionDetermination("deny", f"path rule blocks '{rule['pattern']}'")
       return None
   def _check_command(self, command: str) -> PermissionDetermination | None:
       for pat in self.denied_commands:
           if re.search(pat, command):
               return PermissionDetermination("deny", f"denied command matched /{pat}/")
       return None
   def test(self, software: BaseTool, args: dict) -> PermissionDetermination:
       if "path" in args and software.sort in (PermissionSort.WRITE, PermissionSort.EXECUTE):
           d = self._check_path(str(args["path"]))
           if d:
               return d
       if "command" in args:
           d = self._check_command(str(args["command"]))
           if d:
               return d
       if self.mode is PermissionMode.AUTO:
           return PermissionDetermination("enable", "auto mode")
       if self.mode is PermissionMode.PLAN:
           if software.sort in (PermissionSort.WRITE, PermissionSort.EXECUTE):
               return PermissionDetermination("deny", "plan mode blocks writes/executes")
           return PermissionDetermination("enable", "plan mode permits reads")
       if software.sort in (PermissionSort.READ, PermissionSort.META):
           return PermissionDetermination("enable", "secure software")
       return PermissionDetermination("ask", f"{software.sort.worth} requires approval")
async def auto_approve(software, args, purpose) -> bool:
   print(f"        🔐 approval wanted: {software.title} ({purpose}) -> [auto-approved]")
   return True
async def interactive_approve(software, args, purpose) -> bool:
   ans = enter(f"        🔐 Allow {software.title}({quick(json.dumps(args), 80)})? [y/N] ")
   return ans.strip().decrease().startswith("y")
@dataclass
class HookOutcome:
   blocked: bool = False
   purpose: str = ""
   arguments: dict | None = None
class HookManager:
   """Lifecycle occasions round each software name (like PreToolUse/SubmitToolUse)."""
   def __init__(self):
       self.pre: checklist = []
       self.submit: checklist = []
   def add_pre(self, fn):
       self.pre.append(fn); return self
   def add_post(self, fn):
       self.submit.append(fn); return self
   def run_pre(self, name: ToolName, software: BaseTool, ctx: ToolContext) -> HookOutcome:
       args = dict(name.arguments)
       for fn in self.pre:
           out = fn(name, software, ctx)
           if out is None:
               proceed
           if out.blocked:
               return out
           if out.arguments shouldn't be None:
               args = out.arguments
       return HookOutcome(arguments=args)
   def run_post(self, name, software, outcome: ToolResult, ctx) -> ToolResult:
       for fn in self.submit:
           new = fn(name, software, outcome, ctx)
           if new shouldn't be None:
               outcome = new
       return outcome

We start by establishing the inspiration for the OpenHarness-style tutorial, together with imports, async execution assist, helper capabilities, and core information fashions. We outline messages, software calls, utilization monitoring, token counting, price estimation, permission modes, hooks, and the digital filesystem that retains execution secure. We use this snippet to set up the essential structure on which all subsequent instruments, agent loops, and demos rely.

Building the Tool Layer

@dataclass
class WriteFileInput:
   path: str = fld("File path to write")
   content material: str = fld("Full file content material")
class WriteFileTool(BaseTool):
   title = "write_file"
   description = "Create or overwrite a file with the given content material."
   InputModel = WriteFileInput
   sort = PermissionSort.WRITE
   async def execute(self, args: WriteFileInput, ctx) -> ToolResult:
       ctx.vfs.write(args.path, args.content material)
       return ToolResult(f"Wrote {len(args.content material)} bytes to {args.path}")
@dataclass
class ReadFileInput:
   path: str = fld("File path to learn")
class ReadFileTool(BaseTool):
   title = "read_file"
   description = "Read the complete contents of a file."
   InputModel = ReadFileInput
   sort = PermissionSort.READ
   async def execute(self, args: ReadFileInput, ctx) -> ToolResult:
       if not ctx.vfs.exists(args.path):
           return ToolResult(f"No such file: {args.path}", is_error=True)
       return ToolResult(ctx.vfs.learn(args.path))
@dataclass
class EditInput:
   path: str = fld("File to edit")
   outdated: str = fld("Exact substring to change")
   new: str = fld("Replacement textual content")
class EditTool(BaseTool):
   title = "edit"
   description = "Replace the primary incidence of `outdated` with `new` in a file."
   InputModel = EditInput
   sort = PermissionSort.WRITE
   async def execute(self, args: EditInput, ctx) -> ToolResult:
       if not ctx.vfs.exists(args.path):
           return ToolResult(f"No such file: {args.path}", is_error=True)
       textual content = ctx.vfs.learn(args.path)
       if args.outdated not in textual content:
           return ToolResult(f"`outdated` not present in {args.path}", is_error=True)
       ctx.vfs.write(args.path, textual content.change(args.outdated, args.new, 1))
       return ToolResult(f"Edited {args.path}: changed 1 incidence.")
@dataclass
class ListFilesInput:
   sample: str = fld("Glob sample", default="*")
class ListFilesTool(BaseTool):
   title = "list_files"
   description = "List information matching a glob sample."
   InputModel = ListFilesInput
   sort = PermissionSort.READ
   async def execute(self, args: ListFilesInput, ctx) -> ToolResult:
       information = ctx.vfs.checklist(args.sample)
       return ToolResult("n".be a part of(information) if information else "(no matches)")
@dataclass
class GrepInput:
   sample: str = fld("Regex to seek for")
   path_glob: str = fld("Which information to search", default="*")
class GrepTool(BaseTool):
   title = "grep"
   description = "Search file contents with a daily expression."
   InputModel = GrepInput
   sort = PermissionSort.READ
   async def execute(self, args: GrepInput, ctx) -> ToolResult:
       rx = re.compile(args.sample)
       hits = []
       for p in ctx.vfs.checklist(args.path_glob):
           for i, line in enumerate(ctx.vfs.learn(p).splitlines(), 1):
               if rx.search(line):
                   hits.append(f"{p}:{i}: {line.strip()}")
       return ToolResult("n".be a part of(hits) if hits else "(no matches)")
@dataclass
class RunPythonInput:
   information: checklist = fld("VFS information to exec so as in a single namespace",
                     default_factory=checklist)
   code: str = fld("Extra Python code to run after the information", default="")
class RunPythonTool(BaseTool):
   title = "run_python"
   description = ("Execute Python from the digital filesystem (and/or an inline "
                  "snippet) and seize stdout. Used to run assessments.")
   InputModel = RunPythonInput
   sort = PermissionSort.EXECUTE
   async def execute(self, args: RunPythonInput, ctx) -> ToolResult:
       source_parts = []
       for p in args.information:
           if not ctx.vfs.exists(p):
               return ToolResult(f"No such file: {p}", is_error=True)
           source_parts.append(ctx.vfs.learn(p))
       if args.code:
           source_parts.append(args.code)
       supply = "nn".be a part of(source_parts)
       buf = io.StringIO()
       sandbox_globals = {"__name__": "__main__", "__builtins__": __builtins__}
       attempt:
           with contextlib.redirect_stdout(buf):
               exec(compile(supply, "<agent_code>", "exec"), sandbox_globals)
       besides Exception as e:
           frames = [f for f in traceback.extract_tb(e.__traceback__)
                     if f.filename == "<agent_code>"]
           loc = ""
           if frames:
               final = frames[-1]
               src_lines = supply.splitlines()
               textual content = (src_lines[last.lineno - 1].strip()
                       if 0 < final.lineno <= len(src_lines) else "")
               loc = f"  (line {final.lineno}: {textual content})" if textual content else f"  (line {final.lineno})"
           out = buf.getvalue()
           msg = f"{sort(e).__name__}: {e}{loc}"
           return ToolResult(f"{out}n{msg}".strip(), is_error=True)
       return ToolResult(buf.getvalue().strip() or "(ran with no output)")
@dataclass
class ShellInput:
   command: str = fld("Shell command to run")
class ShellTool(BaseTool):
   title = "shell"
   description = "Run a shell command (simulated over the digital filesystem)."
   InputModel = ShellInput
   sort = PermissionSort.EXECUTE
   async def execute(self, args: ShellInput, ctx) -> ToolResult:
       cmd = args.command.strip()
       if cmd.startswith("ls"):
           return ToolResult("n".be a part of(ctx.vfs.checklist("*")) or "(empty)")
       if cmd.startswith("cat "):
           p = cmd[4:].strip()
           if ctx.vfs.exists(p):
               return ToolResult(ctx.vfs.learn(p))
           return ToolResult(f"cat: {p}: No such file", is_error=True)
       if cmd.startswith("echo "):
           return ToolResult(cmd[5:])
       return ToolResult(f"(simulated) `{cmd}` executed.")
_FAKE_WEB = {
   "vector database":
       "Vector databases (FAISS, Milvus, pgvector) index embeddings for "
       "approximate nearest-neighbour search powering RAG.",
   "agent harness":
       "An agent harness is the infrastructure round an LLM: instruments, reminiscence, "
       "permissions, and a loop that turns mannequin output into actual actions.",
   "exponential backoff":
       "Exponential backoff retries failed calls after 2^n rising delays to "
       "keep away from hammering a struggling service.",
}
@dataclass
class WebSearchInput:
   question: str = fld("Search question")
class WebSearchTool(BaseTool):
   title = "web_search"
   description = "Search the online for up-to-date data. (Mocked in tutorial.)"
   InputModel = WebSearchInput
   sort = PermissionSort.READ
   async def execute(self, args: WebSearchInput, ctx) -> ToolResult:
       await asyncio.sleep(0.05)
       q = args.question.decrease()
       for key, val in _FAKE_WEB.gadgets():
           if key in q:
               return ToolResult(f"[mock result] {val}")
       return ToolResult(f"[mock result] No canned reply for '{args.question}'. "
                         f"(Wire an actual search API right here.)")
@dataclass
class SkillInput:
   title: str = fld("Name of the ability to load")
class SkillTool(BaseTool):
   title = "ability"
   description = ("Load an on-demand ability (markdown playbook) into context. "
                  "Only load a ability proper earlier than you want its steering.")
   InputModel = SkillInput
   sort = PermissionSort.META
   async def execute(self, args: SkillInput, ctx) -> ToolResult:
       physique = ctx.abilities.load(args.title)
       if physique is None:
           return ToolResult(f"Unknown ability: {args.title}", is_error=True)
       return ToolResult(f"Loaded ability '{args.title}':nn{physique}")
@dataclass
class RememberInput:
   observe: str = fld("A sturdy reality/choice to keep in mind throughout periods")
class RememberTool(BaseTool):
   title = "keep in mind"
   description = "Persist a sturdy reality to long-term reminiscence (MEMORY.md)."
   InputModel = RememberInput
   sort = PermissionSort.META
   async def execute(self, args: RememberInput, ctx) -> ToolResult:
       ctx.reminiscence.append(args.observe)
       return ToolResult(f"Remembered: {args.observe}")
@dataclass
class AskUserInput:
   query: str = fld("A query to ask the human")
class AskUserTool(BaseTool):
   title = "ask_user"
   description = "Ask the human a clarifying query."
   InputModel = AskUserInput
   sort = PermissionSort.META
   async def execute(self, args: AskUserInput, ctx) -> ToolResult:
       canned = getattr(ctx, "canned_answers", {})
       ans = canned.get(args.query, "(no reply configured)")
       return ToolResult(f"User answered: {ans}")
@dataclass
class SpawnAgentInput:
   position: str = fld("Which agent profile to spawn (e.g. 'researcher')")
   job: str = fld("The job to delegate to that agent")
class SpawnAgentTool(BaseTool):
   title = "spawn_agent"
   description = ("Delegate a subtask to a specialised subagent and get its "
                  "remaining outcome. Multiple spawns in a single flip run in parallel.")
   InputModel = SpawnAgentInput
   sort = PermissionSort.META
   async def execute(self, args: SpawnAgentInput, ctx) -> ToolResult:
       outcome = await ctx.spawn(args.position, args.job)
       return ToolResult(f"[{args.role}] {outcome}")
@dataclass
class Skill:
   title: str
   description: str
   physique: str
class SkillLibrary:
   """Holds abilities. The *abstract* (title + description) is injected into the
   system immediate; the *physique* is barely pulled in when the mannequin hundreds it."""
   def __init__(self):
       self._skills: dict = {}
       self.loaded: checklist = []
   @staticmethod
   def parse_markdown(md: str) -> Skill:
       """Parse a ability .md with YAML-ish frontmatter (title/description)."""
       title, desc, physique = "unnamed", "", md
       m = re.match(r"^---s*n(.*?)n---s*n(.*)$", md, re.S)
       if m:
           entrance, physique = m.group(1), m.group(2)
           for line in entrance.splitlines():
               if ":" in line:
                   okay, v = line.cut up(":", 1)
                   if okay.strip() == "title":
                       title = v.strip()
                   elif okay.strip() == "description":
                       desc = v.strip()
       return Skill(title=title, description=desc, physique=physique.strip())
   def add_markdown(self, md: str) -> "SkillLibrary":
       s = self.parse_markdown(md)
       self._skills[s.name] = s
       return self
   def abstract(self) -> str:
       if not self._skills:
           return "(none)"
       return "n".be a part of(f"- {s.title}: {s.description}"
                        for s in self._skills.values())
   def load(self, title: str) -> str | None:
       s = self._skills.get(title)
       if s is None:
           return None
       if title not in self.loaded:
           self.loaded.append(title)
       return s.physique
class MemoryRetailer:
   def __init__(self, path: str):
       self.path = path
   def learn(self) -> str:
       if os.path.exists(self.path):
           with open(self.path, "r", encoding="utf-8") as fh:
               return fh.learn().strip()
       return ""
   def append(self, observe: str) -> None:
       with open(self.path, "a", encoding="utf-8") as fh:
           fh.write(f"- {observe}n")
   def reset(self) -> None:
       if os.path.exists(self.path):
           os.take away(self.path)

We construct the sensible software layer that permits the harness to learn information, write information, edit content material, checklist information, search textual content, run Python code, simulate shell instructions, search mock internet information, load abilities, keep in mind notes, ask the person, and spawn subagents. We outline every software with typed inputs, descriptions, permissions, and executable conduct so the agent can work together with its surroundings in a structured means. We additionally add the ability library and persistent reminiscence retailer, which assist the agent load data on demand and protect helpful data throughout periods.

Defining the Model Brain Layer

class TransientLLMError(Exception):
   """A retryable error (timeout / 5xx / rate-limit)."""
class LLMBrain:
   """Interface: given the transcript + software schemas, produce the subsequent flip."""
   mannequin = "summary"
   async def stream(self, messages: checklist, instruments: checklist, on_event=None) -> AssistantTurn:
       increase NotImplementedError
def Say(textual content: str) -> dict:
   return {"remaining": True, "textual content": textual content, "calls": []}
def Use(thought: str, calls: checklist) -> dict:
   return {"remaining": False, "textual content": thought, "calls": calls}
def last_tool_results(messages: checklist) -> checklist:
   """The software outcomes produced for the reason that final assistant flip."""
   out = []
   for m in reversed(messages):
       if m.position == "software":
           out.append({"title": m.title, "content material": m.content material})
       elif m.position == "assistant":
           break
   return checklist(reversed(out))
class ScriptedBrain(LLMBrain):
   mannequin = "mock-sonnet"
   def __init__(self, script: checklist, title: str = "mock-sonnet"):
       self.script = script
       self.i = 0
       self.mannequin = title
   async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
       if self.i >= len(self.script):
           return AssistantTurn(textual content="(script exhausted)", stop_reason="end_turn")
       step = self.script[self.i]
       self.i += 1
       motion = step(messages) if callable(step) else step
       textual content = motion["text"]
       if on_event and textual content:
           on_event("textual content", textual content)
       calls = []
       for j, (tname, targs) in enumerate(motion["calls"]):
           tc = ToolName(id=f"call_{self.i}_{j}", title=tname, arguments=targs)
           calls.append(tc)
           if on_event:
               on_event("tool_call", tc)
       in_tok = sum(count_tokens(m.content material) for m in messages) + 200
       out_tok = count_tokens(textual content) + sum(count_tokens(json.dumps(c.arguments))
                                          for c in calls)
       return AssistantTurn(
           textual content=textual content,
           tool_calls=calls,
           stop_reason="tool_use" if calls else "end_turn",
           utilization=Usage(in_tok, out_tok),
       )
class FlakyBrain(LLMBrain):
   """Wraps a mind and fails the primary `fail_times` calls with a transient
   error, so we are able to watch the engine retry with exponential backoff."""
   def __init__(self, internal: LLMBrain, fail_times: int = 1):
       self.internal = internal
       self.failures_left = fail_times
       self.mannequin = internal.mannequin
   async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
       if self.failures_left > 0:
           self.failures_left -= 1
           increase TransientLLMError("simulated 503 from supplier")
       return await self.internal.stream(messages, instruments, on_event)
class Remaking an attemptBrain(LLMBrain):
   def __init__(self, internal: LLMBrain, retries: int = 4, base_delay: float = 0.05):
       self.internal = internal
       self.retries = retries
       self.base_delay = base_delay
       self.mannequin = internal.mannequin
   async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
       try = 0
       whereas True:
           attempt:
               return await self.internal.stream(messages, instruments, on_event)
           besides TransientLLMError as e:
               if try >= self.retries:
                   increase
               delay = self.base_delay * (2 ** try)
               print(f"        ⚠  transient error ({e}); "
                     f"retry {try + 1}/{self.retries} in {delay:.2f}s")
               await asyncio.sleep(delay)
               try += 1
class RealLLMBrain(LLMBrain):
   def __init__(self, api_format: str, mannequin: str, api_key: str,
                base_url: str, system: str = ""):
       self.api_format = api_format
       self.mannequin = mannequin
       self.api_key = api_key
       self.base_url = base_url.rstrip("/")
       self.system = system
   def _post(self, url: str, headers: dict, payload: dict) -> dict:
       information = json.dumps(payload).encode()
       req = urllib.request.Request(url, information=information, headers=headers, methodology="POST")
       attempt:
           with urllib.request.urlopen(req, timeout=60) as resp:
               return json.hundreds(resp.learn().decode())
       besides urllib.error.HTTPError as e:
           if e.code in (429, 500, 502, 503, 504):
               increase TransientLLMError(f"HTTP {e.code}")
           increase
       besides urllib.error.URLError as e:
           increase TransientLLMError(str(e))
   def _tools_anthropic(self, instruments: checklist) -> checklist:
       return [{"name": t["name"], "description": t["description"],
                "input_schema": t["input_schema"]} for t in instruments]
   def _tools_openai(self, instruments: checklist) -> checklist:
       return [{"type": "function", "function": {
           "name": t["name"], "description": t["description"],
           "parameters": t["input_schema"]}} for t in instruments]
   def _msgs_anthropic(self, messages: checklist) -> checklist:
       out = []
       for m in messages:
           if m.position == "person":
               out.append({"position": "person", "content material": m.content material})
           elif m.position == "assistant":
               blocks = []
               if m.content material:
                   blocks.append({"sort": "textual content", "textual content": m.content material})
               for tc in m.tool_calls:
                   blocks.append({"sort": "tool_use", "id": tc.id,
                                  "title": tc.title, "enter": tc.arguments})
               out.append({"position": "assistant", "content material": blocks or m.content material})
           elif m.position == "software":
               out.append({"position": "person", "content material": [
                   {"type": "tool_result", "tool_use_id": m.tool_call_id,
                    "content": m.content}]})
       return out
   def _msgs_openai(self, messages: checklist) -> checklist:
       out = []
       if self.system:
           out.append({"position": "system", "content material": self.system})
       for m in messages:
           if m.position == "person":
               out.append({"position": "person", "content material": m.content material})
           elif m.position == "assistant":
               msg = {"position": "assistant", "content material": m.content material or None}
               if m.tool_calls:
                   msg["tool_calls"] = [{
                       "id": tc.id, "type": "function",
                       "function": {"name": tc.name,
                                    "arguments": json.dumps(tc.arguments)}}
                       for tc in m.tool_calls]
               out.append(msg)
           elif m.position == "software":
               out.append({"position": "software", "tool_call_id": m.tool_call_id,
                           "content material": m.content material})
       return out
   async def stream(self, messages, instruments, on_event=None) -> AssistantTurn:
       loop = asyncio.get_event_loop()
       if self.api_format == "anthropic":
           payload = {"mannequin": self.mannequin, "max_tokens": 1024,
                      "system": self.system,
                      "messages": self._msgs_anthropic(messages),
                      "instruments": self._tools_anthropic(instruments)}
           headers = {"x-api-key": self.api_key,
                      "anthropic-version": "2023-06-01",
                      "content-type": "utility/json"}
           url = f"{self.base_url}/v1/messages"
           information = await loop.run_in_executor(None, self._post, url, headers, payload)
           textual content, calls = "", []
           for block in information.get("content material", []):
               if block.get("sort") == "textual content":
                   textual content += block["text"]
               elif block.get("sort") == "tool_use":
                   calls.append(ToolName(block["id"], block["name"],
                                         block.get("enter", {})))
           u = information.get("utilization", {})
           utilization = Usage(u.get("input_tokens", 0), u.get("output_tokens", 0))
           cease = "tool_use" if calls else "end_turn"
       else:
           payload = {"mannequin": self.mannequin,
                      "messages": self._msgs_openai(messages),
                      "instruments": self._tools_openai(instruments)}
           headers = {"Authorization": f"Bearer {self.api_key}",
                      "Content-Type": "utility/json"}
           url = f"{self.base_url}/chat/completions"
           information = await loop.run_in_executor(None, self._post, url, headers, payload)
           selection = information["choices"][0]["message"]
           textual content = selection.get("content material") or ""
           calls = []
           for tc in (selection.get("tool_calls") or []):
               fn = tc["function"]
               attempt:
                   cargs = json.hundreds(fn.get("arguments") or "{}")
               besides json.JSONDecodeError:
                   cargs = {}
               calls.append(ToolName(tc["id"], fn["name"], cargs))
           u = information.get("utilization", {})
           utilization = Usage(u.get("prompt_tokens", 0), u.get("completion_tokens", 0))
           cease = "tool_use" if calls else "end_turn"
       if on_event and textual content:
           on_event("textual content", textual content)
       for c in calls:
           if on_event:
               on_event("tool_call", c)
       return AssistantTurn(textual content=textual content, tool_calls=calls,
                            stop_reason=cease, utilization=utilization)
def make_real_brain(system: str = "") -> LLMBrain | None:
   """Build an actual mind from env vars, or return None to fall again to mock."""
   if os.environ.get("USE_REAL_LLM", "0") not in ("1", "true", "True"):
       return None
   if os.environ.get("ANTHROPIC_API_KEY"):
       return RealLLMBrain(
           api_format="anthropic",
           mannequin=os.environ.get("MODEL", "claude-sonnet-4-6"),
           api_key=os.environ["ANTHROPIC_API_KEY"],
           base_url=os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com"),
           system=system)
   if os.environ.get("OPENAI_API_KEY"):
       return RealLLMBrain(
           api_format="openai",
           mannequin=os.environ.get("MODEL", "gpt-4.1"),
           api_key=os.environ["OPENAI_API_KEY"],
           base_url=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1"),
           system=system)
   return None

We outline the mannequin mind layer that decides what the agent does subsequent within the loop. We create a scripted mock mind for deterministic execution, a flaky mind to simulate supplier errors, a retrying wrapper with exponential backoff, and an actual supplier mind for Anthropic- or OpenAI-compatible APIs. We use this snippet to present that the harness stays the identical whereas the intelligence layer can swap between mock execution and actual LLM calls.

Assembling the System Prompt and the QueryEngine Agent Loop

def assemble_system_prompt(*, base: str, project_context: str, reminiscence: str,
                          skills_summary: str, tool_names: checklist) -> str:
   """Mirror OpenHarness immediate meeting: base + CLAUDE.md + MEMORY.md +
   on-demand ability checklist + out there instruments."""
   components = [base.strip()]
   if project_context:
       components.append(f"## Project context (CLAUDE.md)n{project_context.strip()}")
   if reminiscence:
       components.append(f"## Long-term reminiscence (MEMORY.md)n{reminiscence.strip()}")
   if skills_summary and skills_summary != "(none)":
       components.append("## Available abilities (load on demand with the `ability` software)n"
                    + skills_summary)
   components.append("## Available toolsn" + ", ".be a part of(tool_names))
   return "nn".be a part of(components)
def estimate_messages_tokens(messages: checklist) -> int:
   return sum(count_tokens(m.content material) +
              sum(count_tokens(json.dumps(c.arguments)) for c in m.tool_calls)
              for m in messages)
def maybe_compact(messages: checklist, *, max_tokens: int, keep_last: int = 4,
                 verbose: bool = True) -> checklist:
   """Auto-compaction: when the transcript grows previous `max_tokens`, summarize
   the older center into one observe whereas preserving the unique job and the
   most up-to-date turns. Real OpenHarness asks the mannequin to write the abstract and
   additionally preserves job state + channel logs; we use a heuristic right here."""
   tok = estimate_messages_tokens(messages)
   if tok <= max_tokens or len(messages) <= keep_last + 1:
       return messages
   first = messages[0]
   tail = messages[-keep_last:]
   center = messages[1:-keep_last]
   info = []
   for m in center:
       if m.position == "software" and not m.content material.startswith("ERROR"):
           info.append(f"{m.title}: {quick(m.content material, 80)}")
       elif m.position == "assistant" and m.content material:
           info.append(f"assistant: {quick(m.content material, 80)}")
   abstract = Message(
       position="system",
       content material=("[Auto-compacted context] Earlier "
                f"{len(center)} messages summarized. Key outcomes:n  - "
                + "n  - ".be a part of(info[-8:])),
   )
   compacted = [first, summary] + tail
   if verbose:
       print(f"        🗜  auto-compaction: {len(messages)} msgs "
             f"(~{tok} tok) -> {len(compacted)} msgs "
             f"(~{estimate_messages_tokens(compacted)} tok)")
   return compacted
def console_printer(occasion: str, information) -> None:
   if occasion == "textual content" and information:
       print(f"  🤖 {quick(information, 400)}")
   elif occasion == "tool_call":
       print(f"     ↳ name {information.title}({quick(json.dumps(information.arguments), 120)})")
   elif occasion == "tool_result":
       flag = "✗" if information.get("is_error") else "✓"
       print(f"       {flag} {quick(information['output'], 200)}")
class QueryEngine:
   def __init__(self, *, mind: LLMBrain, registry: ToolRegistry,
                ctx: ToolContext, perms: PermissionChecker, hooks: HookManager,
                system_prompt: str, price: CostMeter | None = None,
                approve=auto_approve, max_turns: int = 12,
                compact_at_tokens: int | None = None):
       self.mind = mind
       self.registry = registry
       self.ctx = ctx
       self.perms = perms
       self.hooks = hooks
       self.system_prompt = system_prompt
       self.price = price or CostMeter(getattr(mind, "mannequin", "default"))
       self.approve = approve
       self.max_turns = max_turns
       self.compact_at_tokens = compact_at_tokens
   async def execute_tool(self, name: ToolName, on_event=None) -> ToolResult:
       software = self.registry.get(name.title)
       if software is None:
           return ToolResult(f"Unknown software: {name.title}", is_error=True)
       resolution = self.perms.test(software, name.arguments)
       if resolution.motion == "deny":
           r = ToolResult(f"DENIED by permissions: {resolution.purpose}", is_error=True)
           if on_event:
               on_event("tool_result", {"output": r.output, "is_error": True})
           return r
       if resolution.motion == "ask":
           okay = await self.approve(software, name.arguments, resolution.purpose)
           if not okay:
               r = ToolResult("DENIED by person.", is_error=True)
               if on_event:
                   on_event("tool_result", {"output": r.output, "is_error": True})
               return r
       pre = self.hooks.run_pre(name, software, self.ctx)
       if pre.blocked:
           r = ToolResult(f"BLOCKED by hook: {pre.purpose}", is_error=True)
           if on_event:
               on_event("tool_result", {"output": r.output, "is_error": True})
           return r
       args = pre.arguments
       attempt:
           outcome = await software.run(args, self.ctx)
       besides Exception as e:
           outcome = ToolResult(f"Tool raised: {e}", is_error=True)
       outcome = self.hooks.run_post(name, software, outcome, self.ctx)
       if on_event:
           on_event("tool_result", {"output": outcome.output,
                                    "is_error": outcome.is_error})
       return outcome
   async def run(self, job: str, on_event=console_printer) -> str:
       messages: checklist = [Message(role="user", content=task)]
       for flip in vary(self.max_turns):
           if self.compact_at_tokens:
               messages = maybe_compact(messages, max_tokens=self.compact_at_tokens)
           assistant = await self.mind.stream(messages, self.registry.schemas(),
                                               on_event)
           self.price.add(assistant.utilization)
           if assistant.stop_reason != "tool_use" or not assistant.tool_calls:
               return assistant.textual content
           messages.append(Message(position="assistant", content material=assistant.textual content,
                                   tool_calls=assistant.tool_calls))
           outcomes = await asyncio.collect(
               *[self.execute_tool(tc, on_event) for tc in assistant.tool_calls]
           )
           for tc, res in zip(assistant.tool_calls, outcomes):
               content material = res.output
               if res.is_error and not content material.startswith(("DENIED", "BLOCKED")):
                   content material = "ERROR: " + content material
               elif res.is_error:
                   content material = "ERROR: " + content material
               messages.append(Message(position="software", title=tc.title,
                                       tool_call_id=tc.id, content material=content material))
       return "(stopped: reached max_turns)"
DEFAULT_TOOLS = [
   WriteFileTool, ReadFileTool, EditTool, ListFilesTool, GrepTool,
   RunPythonTool, ShellTool, WebSearchTool, SkillTool, RememberTool,
   AskUserTool, SpawnAgentTool,
]
def build_registry(tool_classes=None) -> ToolRegistry:
   reg = ToolRegistry()
   for cls in (tool_classes or DEFAULT_TOOLS):
       reg.register(cls())
   return reg
BASE_SYSTEM = (
   "You are an autonomous coding/analysis agent operating inside OpenHarness. "
   "Use instruments to take actual actions. Think step-by-step, confirm your work by "
   "operating it, and solely cease when the duty is really full."
)

We assemble the system immediate, estimate transcript measurement, compact lengthy conversations, print streaming occasions, and outline the primary QueryEngine. We make the engine accountable for asking the mind what to do, checking permissions, operating hooks, executing instruments, gathering outcomes, and looping till the duty is completed. We additionally register the default instruments and create the bottom system instruction that guides the agent towards verified and tool-based work.

Running the Agent Loop, Permission Governance, and On-Demand Skills Demos

async def demo_agent_loop():
   clarify(
       "DEMO 1 — The Agent Loop (write code → run → see it fail → repair → cross)",
       """This is the centerpiece. A scripted 'mind' drives the actual harness:
       every flip it emits software calls, the engine runs them via permissions +
       hooks, feeds outcomes again, and the mind REACTS. It writes a factorial
       module with an off-by-one bug, writes a take a look at, runs it, sees the failure
       within the software outcome, fixes the bug, re-runs to inexperienced, and saves a observe to
       reminiscence. We additionally wrap the mind in retry/backoff and a one-time transient
       failure so you may watch restoration — and we observe token price all through.""")
   buggy = (
       "def factorial(n):n"
       "    outcome = 1n"
       "    for i in vary(1, n):n"
       "        outcome *= in"
       "    return resultn"
   )
   test_code = (
       'assert factorial(0) == 1, "0! must be 1"n'
       'assert factorial(5) == 120, "5! must be 120 however bought " + str(factorial(5))n'
       'print("All assessments handed: 0!=1 and 5!=120")n'
   )
   def reactive_fix(messages):
       outs = last_tool_results(messages)
       failed = any("ERROR" in o["content"] or "Assert" in o["content"]
                    for o in outs)
       if failed:
           return Use("The take a look at failed — basic off-by-one within the loop vary. "
                      "vary(1, n) stops at n-1, so I'll make it vary(1, n + 1).",
                      [("edit", {"path": "mathutils.py",
                                 "old": "range(1, n)",
                                 "new": "range(1, n + 1)"})])
       return Use("Tests already cross; re-running to affirm.",
                  [("run_python", {"files": ["mathutils.py", "test_math.py"]})])
   script = [
       Use("First I'll create the factorial module.",
           [("write_file", {"path": "mathutils.py", "content": buggy})]),
       Use("Now a take a look at that pins down the anticipated conduct.",
           [("write_file", {"path": "test_math.py", "content": test_code})]),
       Use("Let me run the take a look at to see if it really works.",
           [("run_python", {"files": ["mathutils.py", "test_math.py"]})]),
       reactive_fix,
       Use("Re-running the take a look at after the repair.",
           [("run_python", {"files": ["mathutils.py", "test_math.py"]})]),
       Use("I'll keep in mind the lesson for subsequent time.",
           [("remember",
             {"note": "factorial(n): loop must be range(1, n+1); "
                      "range(1, n) gives n!/n (off-by-one)."})]),
       lambda m: Say("Done ✅  Created mathutils.py with factorial(), wrote a "
                     "take a look at, caught an off-by-one bug (vary(1, n) → "
                     "vary(1, n + 1)), fastened it, and the take a look at now passes. "
                     "Saved the lesson to reminiscence."),
   ]
   vfs = VirtualFS()
   reminiscence = MemoryRetailer(os.path.be a part of(tempfile.gettempdir(), "oh_demo1_mem.md"))
   reminiscence.reset()
   abilities = SkillLibrary()
   registry = build_registry()
   ctx = ToolContext(vfs=vfs, reminiscence=reminiscence, abilities=abilities)
   hooks = HookManager()
   hooks.add_pre(lambda name, software, c:
                 print(f"        🪝 pre-hook noticed {software.title}") or None)
   mind = Remaking an attemptBrain(FlakyBrain(ScriptedBrain(script), fail_times=1))
   price = CostMeter("mock-sonnet")
   engine = QueryEngine(
       mind=mind, registry=registry, ctx=ctx,
       perms=PermissionChecker(PermissionMode.DEFAULT),
       hooks=hooks,
       system_prompt=assemble_system_prompt(
           base=BASE_SYSTEM, project_context="", reminiscence=reminiscence.learn(),
           skills_summary=abilities.abstract(), tool_names=registry.names()),
       price=price, approve=auto_approve, max_turns=10,
   )
   print("n[running the agent loop]n")
   remaining = await engine.run("Implement factorial() with a take a look at; make assessments cross.")
   print(f"n  FINAL ANSWER:n  {remaining}")
   print(f"n  Virtual filesystem now incorporates:n{vfs.tree()}")
   print(f"n  Final mathutils.py:n" +
         textwrap.indent(vfs.learn("mathutils.py"), "    "))
   print(f"n  💰 price: {price.abstract()}")
   print("n  TAKEAWAY: the mannequin solely decides WHAT to do; the harness handles "
         "HOW — permissions, hooks, execution, retries, outcomes, and price.")
async def demo_permissions():
   clarify(
       "DEMO 2 — Governance: permission modes, path guidelines, denied instructions",
       """The harness is the security boundary. The identical write_file software behaves
       otherwise below default/auto/plan modes; delicate paths are all the time
       denied; path guidelines and denied-command patterns add fine-grained management;
       and PreToolUse hooks can veto a name outright.""")
   write = WriteFileTool()
   shell = ShellTool()
   print("n  (a) Same write below every mode:")
   for mode in (PermissionMode.DEFAULT, PermissionMode.AUTO, PermissionMode.PLAN):
       d = PermissionChecker(mode).test(write, {"path": "notes.txt", "content material": "x"})
       print(f"      {mode.worth:8s} -> {d.motion.higher():5s}  ({d.purpose})")
   print("n  (b) Built-in sensitive-path safety (any mode):")
   for p in ["app/main.py", ".env", "~/.ssh/id_rsa", "secrets/key.txt"]:
       d = PermissionChecker(PermissionMode.AUTO).test(write, {"path": p, "content material": "x"})
       print(f"      write {p:18s} -> {d.motion.higher():5s}  ({d.purpose})")
   print("n  (c) Custom path_rules + denied_commands:")
   laptop = PermissionChecker(
       PermissionMode.AUTO,
       path_rules=[{"pattern": "build/*", "allow": False}],
       denied_commands=[r"rms+-rfs+/", r"DROPs+TABLE"])
   print("      write construct/x ->",
         laptop.test(write, {"path": "construct/x", "content material": "x"}).motion.higher())
   print("      shell 'rm -rf /' ->",
         laptop.test(shell, {"command": "rm -rf /"}).motion.higher())
   print("      shell 'ls -la' ->",
         laptop.test(shell, {"command": "ls -la"}).motion.higher())
   print("n  (d) A PreToolUse safety hook that vetoes writing credentials:")
   def security_hook(name, software, ctx):
       if software.title == "write_file" and "password" in name.arguments.get("content material", "").decrease():
           return HookOutcome(blocked=True, purpose="content material appears like a secret")
       return None
   hooks = HookManager().add_pre(security_hook)
   ctx = ToolContext(vfs=VirtualFS())
   engine = QueryEngine(mind=ScriptedBrain([]), registry=build_registry(),
                        ctx=ctx, perms=PermissionChecker(PermissionMode.AUTO),
                        hooks=hooks, system_prompt="", approve=auto_approve)
   r = await engine.execute_tool(
       ToolName("c1", "write_file",
                {"path": "config.txt", "content material": "password=hunter2"}),
       on_event=console_printer)
   print(f"      outcome -> {r.output}")
   print("n  TAKEAWAY: permissions + hooks kind layered protection round each "
         "single software name, impartial of how intelligent the mannequin is.")
async def demo_skills():
   clarify(
       "DEMO 3 — Skills: on-demand data loading",
       """Skills are markdown playbooks. Only their title+description sit within the
       system immediate (low cost); the complete physique is pulled in ONLY when the mannequin
       decides it wants it, through the `ability` software. This retains context small whereas
       giving the agent deep, swappable experience. Compatible in spirit with
       anthropics/abilities (drop a .md file in and it simply works).""")
   abilities = SkillLibrary()
   abilities.add_markdown(textwrap.dedent("""
       ---
       title: commit
       description: Create clear, typical git commits
       ---
       # Commit ability
       2. Group associated adjustments; write a Conventional Commit topic
          (feat:, repair:, docs:, refactor:, take a look at:).
       3. Keep the topic <= 50 chars; clarify *why* within the physique.
       """))
   abilities.add_markdown(textwrap.dedent("""
       ---
       title: evaluate
       description: Review code for bugs, safety, and readability
       ---
       # Review ability
       Check: correctness, edge instances, enter validation, error dealing with,
       secrets and techniques in code, naming, and take a look at protection. Be particular and sort.
       """))
   registry = build_registry()
   ctx = ToolContext(vfs=VirtualFS(), abilities=abilities,
                     reminiscence=MemoryRetailer(os.path.be a part of(tempfile.gettempdir(),
                                                     "oh_d3.md")))
   sysprompt = assemble_system_prompt(
       base=BASE_SYSTEM, project_context="", reminiscence="",
       skills_summary=abilities.abstract(), tool_names=registry.names())
   print("  System immediate's abilities part (names+descriptions solely):")
   print(textwrap.indent(abilities.abstract(), "      "))
   print(f"n  Loaded ability our bodies to date: {abilities.loaded}  (none but!)")
   script = [
       Use("This is a commit task — I'll load the commit skill first.",
           [("skill", {"name": "commit"})]),
       lambda m: Say("Following the commit playbook I simply loaded, I'd stage the "
                     "associated adjustments and write: 'repair: right factorial "
                     "off-by-one in vary sure'."),
   ]
   engine = QueryEngine(mind=ScriptedBrain(script), registry=registry, ctx=ctx,
                        perms=PermissionChecker(PermissionMode.AUTO),
                        hooks=HookManager(), system_prompt=sysprompt)
   print("n[running]n")
   remaining = await engine.run("Help me commit my adjustments correctly.")
   print(f"n  FINAL: {remaining}")
   print(f"n  Loaded ability our bodies now: {abilities.loaded}  (loaded on demand)")
   print("n  TAKEAWAY: data is paged in just-in-time, not crammed into "
         "each immediate.")

We implement the primary three demos to present the harness in motion. We first run a whole agent loop the place the system writes buggy factorial code, assessments it, detects the failure, fixes the bug, reruns the take a look at, and saves the lesson to reminiscence. We then discover permission governance and on-demand abilities to see how security controls and reusable playbooks enhance the agent workflow.

Adding Memory, Context Compaction, and Multi-Agent Coordination

async def demo_memory():
   clarify(
       "DEMO 4 — Memory: persistent MEMORY.md throughout periods",
       """Long-term reminiscence survives between runs by persisting to MEMORY.md. In
       session 1 the agent data a person choice; in a brand-new session 2
       (contemporary engine, contemporary transcript) that reminiscence is injected into the system
       immediate, so the agent already 'is aware of' the person.""")
   mem_path = os.path.be a part of(tempfile.gettempdir(), "oh_demo4_MEMORY.md")
   reminiscence = MemoryRetailer(mem_path)
   reminiscence.reset()
   registry = build_registry()
   print("  ── Session 1 ──")
   ctx1 = ToolContext(vfs=VirtualFS(), reminiscence=reminiscence, abilities=SkillLibrary())
   s1 = [
       Use("I'll remember the user's stated preferences.",
           [("remember", {"note": "User prefers metric units and concise answers."})]),
       lambda m: Say("Noted your preferences for subsequent time."),
   ]
   eng1 = QueryEngine(mind=ScriptedBrain(s1), registry=registry, ctx=ctx1,
                      perms=PermissionChecker(PermissionMode.AUTO),
                      hooks=HookManager(),
                      system_prompt=assemble_system_prompt(
                          base=BASE_SYSTEM, project_context="",
                          reminiscence=reminiscence.learn(),
                          skills_summary="(none)", tool_names=registry.names()))
   await eng1.run("Remember that I like metric models and quick solutions.")
   print(f"  MEMORY.md is now:n{textwrap.indent(reminiscence.learn(), '      ')}")
   print("n  ── Session 2 (new session, reminiscence reloaded from disk) ──")
   memory2 = MemoryRetailer(mem_path)
   ctx2 = ToolContext(vfs=VirtualFS(), reminiscence=memory2, abilities=SkillLibrary())
   sysprompt2 = assemble_system_prompt(
       base=BASE_SYSTEM, project_context="", reminiscence=memory2.learn(),
       skills_summary="(none)", tool_names=registry.names())
   print("  The new system immediate already incorporates:")
   print(textwrap.indent("## Long-term reminiscence (MEMORY.md)n" + memory2.learn(),
                         "      "))
   s2 = [lambda m: Say("Since you prefer metric and brevity: it's about 5 km. 🙂")]
   eng2 = QueryEngine(mind=ScriptedBrain(s2), registry=registry, ctx=ctx2,
                      perms=PermissionChecker(PermissionMode.AUTO),
                      hooks=HookManager(), system_prompt=sysprompt2)
   remaining = await eng2.run("How far is a 5000 meter run, roughly?")
   print(f"n  FINAL: {remaining}")
   print("n  TAKEAWAY: state that ought to outlive a dialog goes to reminiscence, "
         "then is re-injected at the beginning of future periods.")
async def demo_compaction():
   clarify(
       "DEMO 5 — Context auto-compaction (multi-day periods with out overflow)",
       """As a session grows, the transcript can blow previous the context window.
       Auto-compaction summarizes the older center of the dialog right into a
       compact observe whereas preserving the unique job and the latest
       turns — so long-running brokers preserve going. (We power a tiny threshold to
       set off it; actual OpenHarness asks the mannequin to write the abstract.)""")
   msgs = [Message(role="user", content="Build and verify a data pipeline.")]
   for i in vary(8):
       msgs.append(Message(position="assistant", content material=f"Step {i}: doing work...",
                           tool_calls=[ToolCall(f"c{i}", "shell",
                                                {"command": f"process chunk {i}"})]))
       msgs.append(Message(position="software", title="shell", tool_call_id=f"c{i}",
                           content material=f"chunk {i} processed: 1000 rows okay " * 4))
   earlier than = estimate_messages_tokens(msgs)
   print(f"  Before: {len(msgs)} messages, ~{earlier than} tokens")
   compacted = maybe_compact(msgs, max_tokens=300, keep_last=4)
   after = estimate_messages_tokens(compacted)
   print(f"  After:  {len(compacted)} messages, ~{after} tokens "
         f"({100 * (earlier than - after) // earlier than}% smaller)")
   print("n  The injected abstract message:")
   print(textwrap.indent(compacted[1].content material, "      "))
   print("n  TAKEAWAY: the harness manages the context window so the agent can "
         "run far longer than a single window permits.")
async def demo_multi_agent():
   clarify(
       "DEMO 6 — Swarm coordination: spawning parallel subagents",
       """A lead agent decomposes a job and delegates to specialised subagents.
       Each subagent is its OWN harness (personal loop, personal mind, personal instruments). Two
       researchers run IN PARALLEL (issued in the identical flip → asyncio.collect),
       then a author synthesizes their findings. The crew registry tracks who
       did what.""")
   def researcher_profile():
       reg = build_registry([WebSearchTool])
       script = [
           Use("Researching via web search.",
               [("web_search", {"query": "PLACEHOLDER"})]),
           lambda m: Say("Summary: " +
                         quick(last_tool_results(m)[0]["content"], 160)),
       ]
       return ScriptedBrain(script), reg
   def writer_profile():
       reg = build_registry([WriteFileTool])
       script = [lambda m: Say("Synthesized brief combining both research notes "
                               "into a coherent paragraph.")]
       return ScriptedBrain(script), reg
   profiles = {"researcher": researcher_profile, "author": writer_profile}
   vfs = VirtualFS()
   reminiscence = MemoryRetailer(os.path.be a part of(tempfile.gettempdir(), "oh_d6.md"))
   abilities = SkillLibrary()
   crew: checklist = []
   def make_spawn():
       async def spawn(position: str, job: str) -> str:
           manufacturing facility = profiles.get(position)
           if not manufacturing facility:
               return f"(no such position: {position})"
           child_brain, child_reg = manufacturing facility()
           if position == "researcher" and child_brain.script:
               child_brain.script[0] = Use(f"Researching: {job}",
                                           [("web_search", {"query": task})])
           child_ctx = ToolContext(vfs=vfs, reminiscence=reminiscence, abilities=abilities,
                                   spawn=spawn)
           child_engine = QueryEngine(
               mind=child_brain, registry=child_reg, ctx=child_ctx,
               perms=PermissionChecker(PermissionMode.AUTO),
               hooks=HookManager(), system_prompt="(subagent)",
               approve=auto_approve, max_turns=6)
           print(f"        🧑‍🔧 spawned [{role}] for: {quick(job, 60)}")
           outcome = await child_engine.run(job, on_event=None)
           crew.append({"position": position, "job": job, "outcome": outcome})
           return outcome
       return spawn
   ctx = ToolContext(vfs=vfs, reminiscence=reminiscence, abilities=abilities, spawn=make_spawn())
   registry = build_registry()
   lead_script = [
       Use("I'll split this: research vector databases AND agent harnesses in "
           "parallel, then have a writer combine the findings.",
           [("spawn_agent", {"role": "researcher",
                             "task": "vector database for RAG"}),
            ("spawn_agent", {"role": "researcher",
                             "task": "agent harness design"})]),
       Use("Both analysis notes are in — delegating synthesis to the author.",
           [("spawn_agent", {"role": "writer",
                             "task": "combine the two research notes"})]),
       lambda m: Say("Coordination full: 2 researchers (parallel) + 1 "
                     "author produced a mixed temporary."),
   ]
   engine = QueryEngine(mind=ScriptedBrain(lead_script), registry=registry,
                        ctx=ctx, perms=PermissionChecker(PermissionMode.AUTO),
                        hooks=HookManager(), system_prompt="(lead agent)",
                        max_turns=8)
   print("n[running the lead agent]n")
   t0 = time.time()
   remaining = await engine.run("Produce a brief temporary on constructing RAG brokers.")
   dt = time.time() - t0
   print(f"n  FINAL: {remaining}")
   print(f"n  Team registry ({len(crew)} subagent runs, whole {dt:.3f}s):")
   for entry in crew:
       print(f"    - [{entry['role']}] {quick(entry['task'], 40)} -> "
             f"{quick(entry['result'], 80)}")
   print("n  TAKEAWAY: the identical loop nests — a 'software' could be an complete agent, "
         "enabling parallel groups and delegation.")
async def demo_real_provider():
   clarify(
       "DEMO 7 — Swap in a REAL mannequin (Anthropic / OpenAI-compatible)",
       """Everything above ran on a deterministic mock mind — zero keys, zero
       price. Going reside adjustments precisely ONE factor: the mind. The engine, instruments,
       permissions, hooks, abilities, reminiscence, and coordinator are untouched. This
       is the entire level of a harness: the mannequin is pluggable.""")
   print(textwrap.dedent("""
         To run the SAME harness on an actual mannequin, set surroundings variables and
         re-run (works with any OpenAI- or Anthropic-compatible endpoint that
         OpenHarness helps: Claude, GPT, Kimi, GLM, DeepSeek, Qwen, Groq,
         Ollama, OpenRouter, ...):
             import os
             os.environ["USE_REAL_LLM"]    = "1"
             # --- Anthropic-style ---
             os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
             os.environ["MODEL"]             = "claude-sonnet-4-6"
             # --- or OpenAI-style (incl. native Ollama) ---
             # os.environ["OPENAI_API_KEY"]  = "sk-..."
             # os.environ["OPENAI_BASE_URL"] = "http://localhost:11434/v1"
             # os.environ["MODEL"]           = "llama-3.3-70b"
         Then construct the engine with the actual mind as a substitute of the mock:
             mind = make_real_brain(system=system_prompt) or ScriptedBrain([...])
             engine = QueryEngine(mind=mind, registry=registry, ctx=ctx, ...)
             await engine.run("Refactor utils.py and add assessments.")
   """))
   sysprompt = assemble_system_prompt(
       base=BASE_SYSTEM, project_context="", reminiscence="",
       skills_summary="(none)", tool_names=build_registry().names())
   actual = make_real_brain(system=sysprompt)
   if actual is None:
       print("  [USE_REAL_LLM not set → staying on the mock brain. "
             "Set the env vars above and re-run to go live.]")
       return
   print(f"  [LIVE] Using actual supplier: {actual.api_format} / {actual.mannequin}n")
   vfs = VirtualFS()
   ctx = ToolContext(vfs=vfs, reminiscence=MemoryRetailer(
       os.path.be a part of(tempfile.gettempdir(), "oh_real.md")),
       abilities=SkillLibrary(), canned_answers={})
   engine = QueryEngine(
       mind=Remaking an attemptBrain(actual), registry=build_registry(), ctx=ctx,
       perms=PermissionChecker(PermissionMode.AUTO), hooks=HookManager(),
       system_prompt=sysprompt, price=CostMeter(actual.mannequin), max_turns=12)
   remaining = await engine.run(
       "Create greet.py with a operate greet(title) that returns "
       "'Hello, <title>!', then write and run a fast take a look at to show it really works.")
   print(f"n  FINAL: {remaining}")
   print(f"n  Files:n{vfs.tree()}")
   print(f"n  💰 {engine.price.abstract()}")
async def important():
   banner("OpenHarness From Scratch — guided walkthrough")
   print(textwrap.dedent("""
       We will construct up the harness one subsystem at a time:
         1. The agent loop  (instruments, run/confirm/repair, retries, price)
         2. Permissions     (modes, delicate paths, guidelines, hook veto)
         3. Skills          (on-demand data)
         4. Memory          (persistent MEMORY.md throughout periods)
         5. Compaction      (surviving lengthy periods)
         6. Multi-agent     (parallel subagent delegation)
         7. Real supplier   (one-line swap to a reside mannequin)
       Architecture (what every bit is accountable for):
           User immediate
                │
                ▼
           QueryEngine ──► LLM mind (mock or actual)   "WHAT to do"
                │  ▲            │ tool_use
                │  └────────────┘
                ▼
           For every software name:  Permission ─► PreHook ─► Execute ─► PostHook
                                     │           │          │          │
                                 deny/ask     veto/edit   sandbox    redact
                │
                ▼
           Tool outcome ──► again into the transcript ──► loop
   """).rstrip())
   await demo_agent_loop()
   await demo_permissions()
   await demo_skills()
   await demo_memory()
   await demo_compaction()
   await demo_multi_agent()
   await demo_real_provider()
   banner("All demos full 🎉")
   print(textwrap.dedent("""
       You simply constructed the core of an agent harness:
         • a streaming tool-call loop with retries & price monitoring
         • type-validated, self-describing instruments
         • layered governance (permission modes + lifecycle hooks)
         • on-demand abilities and persistent reminiscence
         • context auto-compaction
         • nested multi-agent coordination
         • a one-line swap to an actual LLM supplier
       To go deeper, research the actual venture: https://github.com/HKUDS/OpenHarness
       (43+ instruments, plugin ecosystem, MCP shopper, React/Ink TUI, the `oh` CLI,
       and the `ohmo` private agent). "The mannequin is the agent; the code is the
       harness."
   """))
run_async(important())

We full the tutorial with reminiscence, context compaction, multi-agent coordination, real-provider switching, and the ultimate guided walkthrough. We show how reminiscence persists throughout periods, how outdated context will get summarized, and how a lead agent delegates work to parallel subagents earlier than synthesizing outcomes. We end by operating all demos via the primary operate, yielding a whole, runnable view of an OpenHarness-style agent system.

Conclusion

In conclusion, we’ve a whole hands-on understanding of how an agent harnesses language-model reasoning with actual, managed actions. We noticed how every subsystem contributes to reliability: instruments make the agent succesful, permissions preserve execution secure, hooks add governance, reminiscence preserves helpful preferences, abilities load data solely when wanted, and compaction helps longer periods keep manageable. We additionally explored how the identical harness can assist deterministic mock brains, actual LLM suppliers, and nested subagents that work collectively on delegated duties.


Check out the Full Codes hereAlso, be happy to observe us on Twitter and don’t overlook to be a part of our 150k+ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to accomplice with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us

The submit How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination appeared first on MarkTechPost.

Similar Posts