A Coding Deep Dive into Agentic UI, Generative UI, State Synchronization, and Interrupt-Driven Approval Flows
In this tutorial, we construct your entire Agentic UI stack from the bottom up utilizing plain Python, with out counting on exterior frameworks to summary away the core concepts. We implement the AG-UI occasion stream to make agent conduct observable in actual time, and we herald A2UI as a declarative layer that enables interfaces to be outlined as structured JSON quite than executable code. As we progress, we allow an LLM to generate full consumer interfaces from pure language, synchronize agent and UI state by means of JSON Patch updates, and implement human-in-the-loop security for essential actions. Also, we acquire a transparent, end-to-end understanding of how agent reasoning transforms into interactive, protocol-compliant consumer interfaces.
import subprocess, sys
for pkg in ["openai", "rich", "pydantic"]:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])
import os, getpass
if os.environ.get("OPENAI_API_KEY"):
API_KEY = os.environ["OPENAI_API_KEY"]
print("
Using OPENAI_API_KEY from setting.")
else:
strive:
from google.colab import userdata
API_KEY = userdata.get("OPENAI_API_KEY")
print("
Using OPENAI_API_KEY from Colab Secrets.")
besides Exception:
API_KEY = getpass.getpass("
Enter your OpenAI API key (hidden): ")
print("
API key obtained.")
BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
import json, re, time, uuid, copy, textwrap
from enum import Enum
from dataclasses import dataclass, subject, asdict
from typing import Any, Optional, Generator
from pydantic import BaseModel, Field
from openai import OpenAI
from wealthy.console import Console
from wealthy.panel import Panel
from wealthy.desk import Table
from wealthy.tree import Tree
from wealthy.textual content import Text
from wealthy.markdown import Markdown
from wealthy import field
console = Console(width=105)
shopper = OpenAI(api_key=API_KEY, base_url=BASE_URL)
def llm(messages, **kw):
strive:
return shopper.chat.completions.create(mannequin=MODEL, messages=messages, temperature=0.2, **kw)
besides Exception as e:
console.print(f"[red]LLM error: {e}[/]")
return None
def hdr(n, title, sub=""):
console.print()
console.rule(f"[bold cyan]SECTION {n}", type="cyan")
physique = f"[bold white]{title}[/]n[dim]{sub}[/]" if sub else f"[bold white]{title}[/]"
console.print(Panel(physique, border_style="cyan", padding=(1, 2)))
hdr(1, "AG-UI Protocol — Event System",
"The actual AG-UI protocol makes use of ~16 occasion varieties streamed by way of SSE.n"
"We implement all core occasion varieties and a streaming emitter in pure Python.")
class AGUIEventType(str, Enum):
RUN_STARTED = "RUN_STARTED"
RUN_FINISHED = "RUN_FINISHED"
RUN_ERROR = "RUN_ERROR"
TEXT_MESSAGE_START = "TEXT_MESSAGE_START"
TEXT_MESSAGE_CONTENT = "TEXT_MESSAGE_CONTENT"
TEXT_MESSAGE_END = "TEXT_MESSAGE_END"
TOOL_CALL_START = "TOOL_CALL_START"
TOOL_CALL_ARGS = "TOOL_CALL_ARGS"
TOOL_CALL_RESULT = "TOOL_CALL_RESULT"
TOOL_CALL_END = "TOOL_CALL_END"
STATE_SNAPSHOT = "STATE_SNAPSHOT"
STATE_DELTA = "STATE_DELTA"
INTERRUPT = "INTERRUPT"
CUSTOM = "CUSTOM"
STEP_STARTED = "STEP_STARTED"
STEP_FINISHED = "STEP_FINISHED"
@dataclass
class AGUIEvent:
kind: AGUIEventType
knowledge: dict = subject(default_factory=dict)
event_id: str = subject(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = subject(default_factory=time.time)
def to_sse(self) -> str:
payload = {"kind": self.kind.worth, "id": self.event_id, **self.knowledge}
return f"occasion: ag-uindata: {json.dumps(payload)}nn"
def to_json(self) -> dict:
return {"kind": self.kind.worth, "id": self.event_id, "ts": self.timestamp, **self.knowledge}
class AGUIEventStream:
def __init__(self):
self.occasions: listing[AGUIEvent] = []
self.listeners: listing = []
def emit(self, occasion: AGUIEvent):
self.occasions.append(occasion)
for listener in self.listeners:
listener(occasion)
def on(self, callback):
self.listeners.append(callback)
def replay(self) -> listing[dict]:
return [e.to_json() for e in self.events]
def demo_agui_lifecycle():
stream = AGUIEventStream()
event_colors = {
"RUN_": "daring inexperienced", "TEXT_": "cyan", "TOOL_": "magenta",
"STATE_": "yellow", "INTERRUPT": "daring crimson", "STEP_": "dim",
}
def frontend_listener(occasion: AGUIEvent):
coloration = "white"
for prefix, c in event_colors.gadgets():
if occasion.kind.worth.startswith(prefix):
coloration = c
break
element = json.dumps(occasion.knowledge)[:80] if occasion.knowledge else ""
console.print(f" [{color}]
{occasion.kind.worth:.<28}[/] {element}")
stream.on(frontend_listener)
run_id = str(uuid.uuid4())[:8]
console.print("[bold]Simulating full AG-UI agent run...[/]n")
stream.emit(AGUIEvent(AGUIEventType.RUN_STARTED, {"run_id": run_id}))
stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "analyzing_query", "label": "Understanding request"}))
stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "analyzing_query"}))
msg_id = str(uuid.uuid4())[:8]
stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_START, {"message_id": msg_id, "position": "assistant"}))
for chunk in ["I'll ", "look up ", "the data ", "and build ", "a dashboard ", "for you."]:
stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_CONTENT, {"message_id": msg_id, "delta": chunk}))
stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_END, {"message_id": msg_id}))
tool_id = str(uuid.uuid4())[:8]
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_START, {"tool_call_id": tool_id, "identify": "query_database"}))
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_ARGS, {"tool_call_id": tool_id, "args_delta": '{"question": "SELECT income FROM gross sales"}'}))
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_RESULT, {"tool_call_id": tool_id, "consequence": [{"month": "Jan", "revenue": 42000}, {"month": "Feb", "revenue": 58000}]}))
stream.emit(AGUIEvent(AGUIEventType.TOOL_CALL_END, {"tool_call_id": tool_id}))
stream.emit(AGUIEvent(AGUIEventType.STATE_SNAPSHOT, {
"state": {"active_agent": "DataAnalyst", "stage": "rendering", "progress": 0.75}
}))
stream.emit(AGUIEvent(AGUIEventType.STATE_DELTA, {
"delta": [{"op": "replace", "path": "/progress", "value": 1.0}]
}))
stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, {
"purpose": "high_risk_action",
"description": "Agent needs to ship an e mail to all 5,000 clients.",
"choices": ["approve", "reject", "modify"],
}))
stream.emit(AGUIEvent(AGUIEventType.RUN_FINISHED, {"run_id": run_id, "standing": "accomplished"}))
console.print(Panel(
stream.occasions[3].to_sse(),
title="[bold]Example SSE wire format (the way it appears to be like on the community)",
border_style="dim",
))
desk = Table(title="AG-UI Event Stream Summary", field=field.ROUNDED)
desk.add_column("Category", type="cyan", width=15)
desk.add_column("Events", justify="heart", type="inexperienced")
counts = {}
for e in stream.occasions:
cat = e.kind.worth.rsplit("_", 1)[0] if "_" in e.kind.worth else e.kind.worth
counts[cat] = counts.get(cat, 0) + 1
for cat, n in counts.gadgets():
desk.add_row(cat, str(n))
desk.add_row("[bold]TOTAL", f"[bold]{len(stream.occasions)}")
console.print(desk)
demo_agui_lifecycle()
We begin by constructing the spine of each agentic frontend: the AG-UI occasion stream. We implement all 16 occasion varieties from the actual AG-UI specification, lifecycle occasions, token-by-token textual content streaming, streamed software calls, state snapshots, deltas, and interrupt indicators, and serialize them into the SSE wire format that manufacturing programs use over HTTP. We then wire up a frontend listener that reacts to every occasion because it arrives, simulating the precise expertise a React or Flutter app would have consuming this stream.
hdr(2, "A2UI — Declarative Component Trees",
"Google's A2UI spec: brokers emit flat JSON part lists with ID refs.n"
"The shopper's widget registry maps varieties → native widgets.n"
"Safe like knowledge, expressive like code. No executable code despatched.")
class A2UIMessageType(str, Enum):
CREATE_SURFACE = "createSurface"
UPDATE_COMPONENTS = "replaceComponents"
UPDATE_DATA_MODEL = "replaceDataModel"
DELETE_SURFACE = "deleteSurface"
@dataclass
class A2UIComponent:
id: str
kind: str
properties: dict = subject(default_factory=dict)
kids: listing[str] = subject(default_factory=listing)
def to_dict(self) -> dict:
d = {"id": self.id, "kind": self.kind, **self.properties}
if self.kids:
d["children"] = self.kids
return d
@dataclass
class A2UIDataModel:
knowledge: dict = subject(default_factory=dict)
def get_binding(self, path: str) -> Any:
components = [p for p in path.split("/") if p]
val = self.knowledge
for p in components:
if isinstance(val, dict):
val = val.get(p)
elif isinstance(val, listing) and p.isdigit():
val = val[int(p)]
else:
return None
return val
@dataclass
class A2UISurface:
surface_id: str
elements: listing[A2UIComponent] = subject(default_factory=listing)
data_model: A2UIDataModel = subject(default_factory=A2UIDataModel)
def to_messages(self) -> listing[dict]:
msgs = []
msgs.append({
"kind": A2UIMessageType.CREATE_SURFACE.worth,
"surfaceId": self.surface_id,
})
msgs.append({
"kind": A2UIMessageType.UPDATE_COMPONENTS.worth,
"surfaceId": self.surface_id,
"elements": [c.to_dict() for c in self.components],
})
if self.data_model.knowledge:
msgs.append({
"kind": A2UIMessageType.UPDATE_DATA_MODEL.worth,
"surfaceId": self.surface_id,
"knowledgeModel": self.data_model.knowledge,
})
return msgs
class WidgetRegistry:
def __init__(self):
self._renderers = {}
def register(self, component_type: str, render_fn):
self._renderers[component_type] = render_fn
def render(self, part: A2UIComponent, floor: A2UISurface, indent: int = 0):
fn = self._renderers.get(part.kind)
if fn:
fn(part, floor, indent)
else:
pad = " " * indent
console.print(f"{pad}[dim]⟨{part.kind} id={part.id}⟩ (no renderer)[/]")
def render_tree(self, floor: A2UISurface):
comp_map = {c.id: c for c in floor.elements}
all_children = set()
for c in floor.elements:
all_children.replace(c.kids)
roots = [c for c in surface.components if c.id not in all_children]
def _render(comp_id: str, indent: int):
comp = comp_map.get(comp_id)
if not comp:
return
self.render(comp, floor, indent)
for child_id in comp.kids:
_render(child_id, indent + 1)
for root in roots:
_render(root.id, 0)
registry = WidgetRegistry()
def _resolve(comp, floor, key, default=None):
val = comp.properties.get(key, default)
binding = comp.properties.get("knowledgeBinding")
if binding and isinstance(binding, str) and binding.startswith("/"):
resolved = floor.data_model.get_binding(binding)
if resolved just isn't None:
return resolved
if isinstance(val, str) and val.startswith("/") and "/" in val[1:]:
resolved = floor.data_model.get_binding(val)
if resolved just isn't None:
return resolved
return val
def _to_float(val, default=0.0):
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
cleaned = val.strip().rstrip("%")
strive:
f = float(cleaned)
if "%" in val or f > 1:
return f / 100.0
return f
besides ValueError:
return default
return default
def render_card(comp, floor, indent):
pad = " " * indent
title = str(_resolve(comp, floor, "title", "Card"))
console.print(f"{pad}┌─{'─' * 50}─┐")
console.print(f"{pad}│ [bold]{title:^50}[/] │")
console.print(f"{pad}├─{'─' * 50}─┤")
if not comp.kids:
subtitle = str(_resolve(comp, floor, "subtitle", ""))
if subtitle:
console.print(f"{pad}│ {subtitle:<49}│")
console.print(f"{pad}└─{'─' * 50}─┘")
def render_text(comp, floor, indent):
pad = " " * indent
textual content = _resolve(comp, floor, "textual content", "")
type = comp.properties.get("type", "physique")
kinds = {"headline": "daring white", "physique": "white", "caption": "dim", "label": "daring cyan"}
console.print(f"{pad}[{styles.get(style, 'white')}]{textual content}[/]")
def render_button(comp, floor, indent):
pad = " " * indent
label = str(_resolve(comp, floor, "label", "Button"))
variant = comp.properties.get("variant", "major")
colours = {"major": "daring white on blue", "secondary": "white on grey30", "hazard": "daring white on crimson"}
console.print(f"{pad} [{colors.get(variant, 'white')}] {label} [/]")
def render_text_field(comp, floor, indent):
pad = " " * indent
label = comp.properties.get("label", "Input")
placeholder = comp.properties.get("placeholder", "")
console.print(f"{pad} {label}: [dim]┌──────────────────────────┐[/]")
console.print(f"{pad} [dim]│ {placeholder:<25}│[/]")
console.print(f"{pad} [dim]└──────────────────────────┘[/]")
def render_row(comp, floor, indent):
go
def render_column(comp, floor, indent):
go
def render_image(comp, floor, indent):
pad = " " * indent
alt = comp.properties.get("alt", "picture")
console.print(f"{pad} [dim]
[{alt}][/]")
def render_divider(comp, floor, indent):
pad = " " * indent
console.print(f"{pad} {'─' * 50}")
def render_chip(comp, floor, indent):
pad = " " * indent
label = str(_resolve(comp, floor, "label", ""))
console.print(f"{pad} [on grey23] {label} [/]")
def render_progress(comp, floor, indent):
pad = " " * indent
raw_value = _resolve(comp, floor, "worth", 0)
worth = max(0.0, min(1.0, _to_float(raw_value, 0.0)))
label = str(_resolve(comp, floor, "label", ""))
bar_len = int(worth * 40)
bar = f"[green]{'█' * bar_len}[/][dim]{'░' * (40 - bar_len)}[/]"
console.print(f"{pad} {label}: {bar} {worth*100:.0f}%")
for identify, fn in [
("card", render_card), ("text", render_text), ("button", render_button),
("text-field", render_text_field), ("row", render_row), ("column", render_column),
("image", render_image), ("divider", render_divider), ("chip", render_chip),
("progress-bar", render_progress),
]:
registry.register(identify, fn)
console.print("n[bold]Demo: A2UI reserving kind — agent generates a restaurant reservation UI[/]n")
booking_surface = A2UISurface(
surface_id="booking-form-1",
elements=[
A2UIComponent("root", "card", {"title": "
Reserve a Table"}, children=["c1", "c2", "c3", "c4", "c5", "c6"]),
A2UIComponent("c1", "textual content", {"textual content": "", "knowledgeBinding": "/restaurant/identify", "type": "headline"}),
A2UIComponent("c2", "textual content", {"textual content": "", "knowledgeBinding": "/restaurant/delicacies", "type": "caption"}),
A2UIComponent("c3", "divider", {}),
A2UIComponent("c4", "text-field", {"label": "Date", "placeholder": "YYYY-MM-DD"}),
A2UIComponent("c5", "text-field", {"label": "Guests", "placeholder": "1-12"}),
A2UIComponent("c6", "button", {"label": "Reserve Now", "variant": "major", "motion": "submit_booking"}),
],
data_model=A2UIDataModel({"restaurant": {"identify": "Chez Laurent", "delicacies": "French Contemporary • $$$$"}})
)
console.print(Panel(
"n".be part of(json.dumps(m, indent=2)[:200] for m in booking_surface.to_messages()),
title="[bold]A2UI JSONL stream (what goes over the wire)",
border_style="yellow",
))
console.print("[bold]Rendered by shopper widget registry:[/]n")
registry.render_tree(booking_surface)
console.print()
t = Table(title="A2UI Flat Component List (Adjacency Model)", field=field.ROUNDED)
t.add_column("ID", type="cyan", width=8)
t.add_column("Type", type="inexperienced", width=14)
t.add_column("Children", type="yellow", width=20)
t.add_column("Bindings", type="magenta", width=25)
for c in booking_surface.elements:
binding = c.properties.get("knowledgeBinding", "")
t.add_row(c.id, c.kind, ", ".be part of(c.kids) if c.kids else "—", binding or "—")
console.print(t)
We implement Google’s A2UI specification: a flat adjacency-list mannequin the place elements reference kids by ID quite than nesting, making the format trivially streamable and simple for LLMs to generate incrementally. We construct a client-side Widget Registry that maps summary kind strings like “card”, “text-field”, and “progress-bar” to concrete terminal renderers, mirroring how a manufacturing app maps them to React elements or Flutter widgets. We exhibit the complete cycle with a restaurant reserving kind, full with knowledge mannequin bindings that decouple dynamic values from UI construction, precisely because the A2UI spec prescribes.
hdr(3, "Generative UI — LLM Produces Live Interfaces",
"The agent generates A2UI part bushes dynamically based mostly on the question.n"
"This is the core of 'Generative UI' — context-adaptive interfacesn"
"that go far past text-only chat responses.")
A2UI_GENERATION_PROMPT = """
You are an A2UI Generative UI agent. Given a consumer question, you generate a wealthy
interactive interface — NOT textual content. You output an A2UI part tree as JSON.
RULES:
1. Output a flat listing of elements utilizing the adjacency mannequin (kids = listing of IDs).
2. Available part varieties: card, textual content, button, text-field, row, column, divider, chip, picture, progress-bar, choose, date-picker, data-table
3. Include a separate "knowledgeModel" object for dynamic values. Use "/path/to/worth" bindings.
4. The ROOT part ought to be a "card" with all others as descendants.
5. Think about what UI BEST serves the consumer — kinds for enter, tables for knowledge,
progress bars for standing, chips for tags, buttons for actions.
OUTPUT FORMAT (strict JSON, nothing else):
{
"surfaceId": "unique-id",
"elements": [
{"id": "root", "type": "card", "title": "...", "children": ["c1", "c2"]},
{"id": "c1", "kind": "textual content", "textual content": "...", "type": "headline"},
...
],
"knowledgeModel": { ... }
}
"""
def generate_ui(user_query: str) -> Optional[A2UISurface]:
console.print(f" [dim]Generating UI for:[/] [bold]{user_query}[/]")
response = llm([
{"role": "system", "content": A2UI_GENERATION_PROMPT},
{"role": "user", "content": user_query},
], max_tokens=1200)
if not response:
return None
uncooked = response.selections[0].message.content material
strive:
cleaned = re.sub(r'```jsons*|s*```', '', uncooked).strip()
spec = json.masses(cleaned)
besides json.JSONDecodeError:
console.print(f"[red]Failed to parse generated UI: {uncooked[:200]}[/]")
return None
elements = []
for c in spec.get("elements", []):
elements.append(A2UIComponent(
id=c.get("id", str(uuid.uuid4())[:6]),
kind=c.get("kind", "textual content"),
properties={okay: v for okay, v in c.gadgets() if okay not in ("id", "kind", "kids")},
kids=c.get("kids", []),
))
floor = A2UISurface(
surface_id=spec.get("surfaceId", f"gen-{uuid.uuid4().hex[:6]}"),
elements=elements,
data_model=A2UIDataModel(spec.get("knowledgeModel", {})),
)
return floor
def demo_generative_ui(question: str):
floor = generate_ui(question)
if floor:
console.print(f"n[bold green]Generated {len(floor.elements)} elements:[/]")
registry.render_tree(floor)
console.print()
varieties = {}
for c in floor.elements:
varieties[c.type] = varieties.get(c.kind, 0) + 1
console.print(" [dim]Component varieties used:[/] " + ", ".be part of(f"[cyan]{t}[/]×{n}" for t, n in varieties.gadgets()))
if floor.data_model.knowledge:
console.print(f" [dim]Data mannequin keys:[/] {listing(floor.data_model.knowledge.keys())}")
console.print()
console.print("n[bold]Demo 1: Agent generates an onboarding kind[/]")
demo_generative_ui(
"Create a consumer onboarding circulation: acquire identify, e mail, position (dropdown), "
"most well-liked notification technique (chips), and a 'Get Started' button."
)
console.print("n[bold]Demo 2: Agent generates an information dashboard[/]")
demo_generative_ui(
"Show a undertaking standing dashboard with: undertaking identify 'Atlas v2', "
"4 crew members, dash progress at 68%, 3 blockers flagged as essential, "
"and motion buttons for 'View Backlog' and 'Schedule Standup'."
)
console.print("n[bold]Demo 3: Agent generates a affirmation dialog[/]")
demo_generative_ui(
"Show a fee affirmation: $2,450 cost to Visa ending 4242, "
"order #ORD-8891, with Approve and Decline buttons."
)
We hand the keys to the LLM and let it generate full A2UI part bushes at runtime from plain English descriptions, that is Generative UI in its purest kind. We immediate the mannequin with the A2UI schema and part catalog, and it produces absolutely structured surfaces with playing cards, kinds, chips, progress bars, and knowledge bindings, selecting the perfect UI sample for every question. We run three demos, an onboarding circulation, a undertaking dashboard, and a fee affirmation, exhibiting how the identical agent adapts its interface to wildly completely different contexts and not using a single hardcoded format.
hdr(4, "State Synchronization — Shared State Between Agent & UI",
"AG-UI syncs state bidirectionally utilizing STATE_SNAPSHOT and STATE_DELTA.n"
"The agent IS the state machine; the UI IS the renderer.n"
"JSON Patch diffs maintain updates minimal and environment friendly.")
class SharedState:
def __init__(self, preliminary: dict = None):
self.state: dict = preliminary or {}
self.historical past: listing[dict] = []
self.model: int = 0
def snapshot(self) -> AGUIEvent:
return AGUIEvent(AGUIEventType.STATE_SNAPSHOT, {"state": copy.deepcopy(self.state), "model": self.model})
def apply_delta(self, operations: listing[dict]) -> AGUIEvent:
for op in operations:
path_parts = [p for p in op["path"].break up("/") if p]
goal = self.state
for half in path_parts[:-1]:
if isinstance(goal, dict):
goal = goal.setdefault(half, {})
elif isinstance(goal, listing) and half.isdigit():
goal = goal[int(part)]
key = path_parts[-1] if path_parts else None
if secret's None:
proceed
if op["op"] == "change":
goal[key] = op["value"]
elif op["op"] == "add":
if isinstance(goal, listing) and key.isdigit():
goal.insert(int(key), op["value"])
else:
goal[key] = op["value"]
elif op["op"] == "take away":
if isinstance(goal, dict):
goal.pop(key, None)
self.model += 1
self.historical past.append({"model": self.model, "ops": operations})
return AGUIEvent(AGUIEventType.STATE_DELTA, {"delta": operations, "model": self.model})
console.print("n[bold]Demo: Document evaluation pipeline — 3 brokers, shared state[/]n")
stream = AGUIEventStream()
state = SharedState({
"doc": {"title": "This autumn Strategy Report", "standing": "draft", "word_count": 2840},
"pipeline": {"stage": "analysis", "progress": 0.0},
"brokers": {"energetic": "Researcher", "queue": ["Editor", "Reviewer"]},
"suggestions": [],
})
def log_event(occasion: AGUIEvent):
if occasion.kind in (AGUIEventType.STATE_SNAPSHOT, AGUIEventType.STATE_DELTA):
if occasion.kind == AGUIEventType.STATE_DELTA:
ops = occasion.knowledge.get("delta", [])
for op in ops:
console.print(f" [yellow]STATE_DELTA[/] v{occasion.knowledge.get('model')}: "
f"[cyan]{op['op']}[/] {op['path']} → {op.get('worth', '∅')}")
else:
console.print(f" [yellow]STATE_SNAPSHOT[/] v{occasion.knowledge.get('model')}: {listing(occasion.knowledge['state'].keys())}")
stream.on(log_event)
stream.emit(state.snapshot())
console.print("n[bold green]▸ Researcher agent working...[/]")
stream.emit(state.apply_delta([
{"op": "replace", "path": "/pipeline/stage", "value": "research_complete"},
{"op": "replace", "path": "/pipeline/progress", "value": 0.33},
{"op": "add", "path": "/feedback/0", "value": {"agent": "Researcher", "note": "Added 4 new data sources"}},
]))
console.print("n[bold green]▸ Editor agent working...[/]")
stream.emit(state.apply_delta([
{"op": "replace", "path": "/agents/active", "value": "Editor"},
{"op": "replace", "path": "/pipeline/stage", "value": "editing"},
{"op": "replace", "path": "/pipeline/progress", "value": 0.66},
{"op": "replace", "path": "/document/word_count", "value": 3150},
]))
console.print("n[bold green]▸ Reviewer agent working...[/]")
stream.emit(state.apply_delta([
{"op": "replace", "path": "/agents/active", "value": "Reviewer"},
{"op": "replace", "path": "/pipeline/stage", "value": "review_complete"},
{"op": "replace", "path": "/pipeline/progress", "value": 1.0},
{"op": "replace", "path": "/document/status", "value": "approved"},
]))
console.print(Panel(
json.dumps(state.state, indent=2),
title="[bold]Final shared state after pipeline",
border_style="inexperienced",
))
t = Table(title="State History (variations)", field=field.ROUNDED)
t.add_column("Version", type="cyan", justify="heart")
t.add_column("Operations", type="yellow")
for h in state.historical past:
ops_summary = "; ".be part of(f"{o['op']} {o['path']}" for o in h["ops"])
t.add_row(str(h["version"]), ops_summary[:70])
console.print(t)
hdr(5, "Human-in-the-Loop — AG-UI INTERRUPT Events",
"When an agent hits a high-stakes motion, it emits an INTERRUPT occasion.n"
"The frontend renders an approval UI. Execution pauses till the humann"
"approves, rejects, or modifies. State is preserved all through.")
@dataclass
class InterruptRequest:
interrupt_id: str
action_description: str
risk_level: str
affected_resources: listing[str]
proposed_changes: dict
choices: listing[str] = subject(default_factory=lambda: ["approve", "reject", "modify"])
@dataclass
class InterruptResponse:
interrupt_id: str
choice: str
modifications: Optional[dict] = None
class InterruptreadyAgent:
RISK_RULES = {
"delete": "essential",
"fee": "essential",
"email_all": "excessive",
"publish": "excessive",
"replace": "medium",
"learn": "low",
}
def __init__(self):
self.stream = AGUIEventStream()
self.pending_interrupts: dict[str, InterruptRequest] = {}
def assess_and_maybe_interrupt(self, motion: str, particulars: dict) -> Optional[InterruptRequest]:
threat = "low"
for key phrase, degree in self.RISK_RULES.gadgets():
if key phrase in motion.decrease():
threat = degree
break
if threat in ("essential", "excessive"):
interrupt = InterruptRequest(
interrupt_id=str(uuid.uuid4())[:8],
action_description=motion,
risk_level=threat,
affected_resources=particulars.get("assets", []),
proposed_changes=particulars.get("modifications", {}),
)
self.pending_interrupts[interrupt.interrupt_id] = interrupt
self.stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, {
"interrupt_id": interrupt.interrupt_id,
"purpose": threat,
"description": interrupt.action_description,
"affected_resources": interrupt.affected_resources,
"proposed_changes": interrupt.proposed_changes,
"choices": interrupt.choices,
}))
return interrupt
return None
def resolve_interrupt(self, response: InterruptResponse) -> str:
interrupt = self.pending_interrupts.pop(response.interrupt_id, None)
if not interrupt:
return "No pending interrupt discovered."
if response.choice == "approve":
return f"
APPROVED: '{interrupt.action_description}' executing now."
elif response.choice == "reject":
return f"
REJECTED: '{interrupt.action_description}' cancelled."
elif response.choice == "modify":
return f"
MODIFIED: '{interrupt.action_description}' up to date with: {response.modifications}"
return f"Unknown choice: {response.choice}"
console.print("n[bold]Demo: Agent encounters actions of various threat ranges[/]n")
agent = InterruptreadyAgent()
actions = [
("Read user profile", {"resources": ["user:123"]}),
("Update consumer preferences", {"assets": ["user:123"], "modifications": {"theme": "darkish"}}),
("Delete consumer account", {"assets": ["user:123", "data:all"], "modifications": {"motion": "permanent_delete"}}),
("Email all 12,000 customers", {"assets": ["email:newsletter"], "modifications": {"topic": "Big Announcement"}}),
("Publish weblog put up", {"assets": ["post:draft-42"], "modifications": {"standing": "public"}}),
]
def event_logger(occasion):
if occasion.kind == AGUIEventType.INTERRUPT:
d = occasion.knowledge
risk_style = {"essential": "daring crimson", "excessive": "daring yellow"}.get(d["reason"], "white")
console.print(f"n [bold]
INTERRUPT EVENT[/]")
console.print(f" Risk: [{risk_style}]{d['reason'].higher()}[/]")
console.print(f" Action: {d['description']}")
console.print(f" Affected: {d['affected_resources']}")
console.print(f" Options: {d['options']}")
agent.stream.on(event_logger)
outcomes = []
for action_desc, particulars in actions:
interrupt = agent.assess_and_maybe_interrupt(action_desc, particulars)
if interrupt:
choice = "reject" if interrupt.risk_level == "essential" else "approve"
consequence = agent.resolve_interrupt(InterruptResponse(interrupt.interrupt_id, choice))
else:
consequence = f"
AUTO-EXECUTED: '{action_desc}' (low threat, no approval wanted)"
outcomes.append((action_desc, consequence))
console.print()
t = Table(title="Execution Results", field=field.ROUNDED, show_lines=True)
t.add_column("Action", type="white", width=28)
t.add_column("Outcome", type="dim", width=55)
for action_desc, lead to outcomes:
t.add_row(action_desc, consequence)
console.print(t)
We construct a SharedState engine that emits AG-UI STATE_SNAPSHOT and STATE_DELTA occasions utilizing JSON Patch operations, retaining the agent backend and the frontend UI completely synchronized by means of each mutation. We exhibit this with a three-agent doc evaluation pipeline through which a Researcher, Editor, and Reviewer every modify the shared state in sequence, and the frontend sees each change the moment it happens. We then implement the AG-UI INTERRUPT sample, through which the agent assesses threat ranges for proposed actions, emits interrupt occasions for any harmful actions, and pauses execution till a human approves, rejects, or modifies the plan.
hdr(6, "Full Pipeline — LLM-Driven Adaptive UI",
"The full Agentic UI structure in a single pipeline:n"
" User question → Intent evaluation → UI sample choice →n"
" A2UI technology → AG-UI occasion streaming → State sync → Render")
UI_ROUTER_PROMPT = """
You are a UI routing agent. Given a consumer question, resolve what kind of UI to generate.
RESPOND IN JSON ONLY:
listing
"""
class AgenticUIPipeline:
def __init__(self):
self.stream = AGUIEventStream()
self.state = SharedState({"pipeline": {"stage": "idle"}, "renders": 0})
self.interrupt_agent = InterruptreadyAgent()
def route(self, question: str) -> dict:
resp = llm([
{"role": "system", "content": UI_ROUTER_PROMPT},
{"role": "user", "content": query},
], max_tokens=300)
if not resp:
return {"intent": "dashboard", "reasoning": "fallback", "ui_complexity": "easy",
"needs_approval": False, "data_requirements": []}
uncooked = resp.selections[0].message.content material
strive:
return json.masses(re.sub(r'```jsons*|s*```', '', uncooked).strip())
besides json.JSONDecodeError:
return {"intent": "dashboard", "reasoning": "parse_fallback", "ui_complexity": "easy",
"needs_approval": False, "data_requirements": []}
def run(self, user_query: str):
run_id = str(uuid.uuid4())[:8]
console.print(Panel(f"[bold]{user_query}[/]", title="
User Query", border_style="white"))
self.stream.emit(AGUIEvent(AGUIEventType.RUN_STARTED, {"run_id": run_id}))
self.stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "routing"}))
routing = self.route(user_query)
console.print(f"n [bold cyan]
Router Decision:[/]")
console.print(f" Intent: [green]{routing.get('intent')}[/] | "
f"Complexity: [yellow]{routing.get('ui_complexity')}[/] | "
f"Approval: {'
' if routing.get('needs_approval') else '
'}")
console.print(f" Reasoning: [dim]{routing.get('reasoning', '')}[/]")
self.stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "routing", "consequence": routing}))
self.state.apply_delta([
{"op": "replace", "path": "/pipeline/stage", "value": "generating"},
])
self.stream.emit(AGUIEvent(AGUIEventType.STEP_STARTED, {"step": "generating_ui"}))
msg_id = str(uuid.uuid4())[:8]
self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_START, {"message_id": msg_id}))
self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_CONTENT, {
"message_id": msg_id,
"delta": f"Building a {routing.get('intent')} interface for you..."
}))
self.stream.emit(AGUIEvent(AGUIEventType.TEXT_MESSAGE_END, {"message_id": msg_id}))
floor = generate_ui(user_query)
self.stream.emit(AGUIEvent(AGUIEventType.STEP_FINISHED, {"step": "generating_ui"}))
if routing.get("needs_approval") and floor:
self.stream.emit(AGUIEvent(AGUIEventType.INTERRUPT, {
"purpose": "ui_confirmation",
"description": f"Generated {len(floor.elements)} part UI. Render it?",
"choices": ["render", "regenerate", "cancel"],
}))
console.print("n [bold yellow]
INTERRUPT:[/] UI generated, awaiting human approval...")
console.print(" [green]→ Auto-approving for demo...[/]")
if floor:
self.state.apply_delta([
{"op": "replace", "path": "/pipeline/stage", "value": "rendering"},
{"op": "replace", "path": "/renders", "value": self.state.state.get("renders", 0) + 1},
])
console.print(f"n[bold green]
Rendered Interface ({len(floor.elements)} elements):[/]n")
registry.render_tree(floor)
self.stream.emit(AGUIEvent(AGUIEventType.CUSTOM, {
"subtype": "a2ui_surface",
"floor": floor.to_messages(),
}))
self.state.apply_delta([{"op": "replace", "path": "/pipeline/stage", "value": "complete"}])
self.stream.emit(AGUIEvent(AGUIEventType.RUN_FINISHED, {"run_id": run_id, "standing": "success"}))
console.print()
event_counts = {}
for e in self.stream.occasions:
event_counts[e.type.value] = event_counts.get(e.kind.worth, 0) + 1
t = Table(title="Pipeline Event Summary", field=field.ROUNDED)
t.add_column("Event Type", type="cyan")
t.add_column("Count", justify="heart", type="inexperienced")
for etype, rely in sorted(event_counts.gadgets()):
t.add_row(etype, str(rely))
console.print(t)
pipeline = AgenticUIPipeline()
console.print("n[bold]Demo 1: Agent builds a settings kind[/]")
pipeline.run(
"Create a notification settings panel the place I can toggle e mail/SMS/push, "
"set quiet hours, and choose a notification sound."
)
pipeline.stream = AGUIEventStream()
pipeline.state = SharedState({"pipeline": {"stage": "idle"}, "renders": 0})
console.print("n[bold]Demo 2: Agent builds an order monitoring dashboard[/]")
pipeline.run(
"Show order #ORD-7742 standing: shipped by way of FedEx, monitoring 789456123, "
"estimated supply March 24, 2 of three gadgets delivered. Show a progress bar "
"and motion buttons for 'Contact Support' and 'Request Refund'."
)
hdr(7, "Incremental UI Updates — Live Surface Modification",
"A2UI surfaces are incrementally updateable. The agent can add, take away,n"
"or modify elements and knowledge bindings on a reside floor withoutn"
"regenerating the entire tree. Essential for real-time collaboration.")
class LiveSurface:
def __init__(self, floor: A2UISurface):
self.floor = floor
self.update_log: listing[str] = []
def add_component(self, part: A2UIComponent, parent_id: Optional[str] = None):
self.floor.elements.append(part)
if parent_id:
for c in self.floor.elements:
if c.id == parent_id:
c.kids.append(part.id)
break
self.update_log.append(f"ADD {part.kind}#{part.id} → guardian:{parent_id or 'root'}")
def update_component(self, component_id: str, new_props: dict):
for c in self.floor.elements:
if c.id == component_id:
c.properties.replace(new_props)
self.update_log.append(f"UPD #{component_id} props: {listing(new_props.keys())}")
return
self.update_log.append(f"ERR #{component_id} not discovered")
def remove_component(self, component_id: str):
self.floor.elements = [c for c in self.surface.components if c.id != component_id]
for c in self.floor.elements:
if component_id in c.kids:
c.kids.take away(component_id)
self.update_log.append(f"DEL #{component_id}")
def update_data(self, path: str, worth: Any):
self.floor.data_model.knowledge = _set_nested(self.floor.data_model.knowledge, path, worth)
self.update_log.append(f"DATA {path} = {worth}")
def _set_nested(d: dict, path: str, worth: Any) -> dict:
components = [p for p in path.split("/") if p]
d = copy.deepcopy(d)
present = d
for p in components[:-1]:
present = present.setdefault(p, {})
if components:
present[parts[-1]] = worth
return d
console.print("n[bold]Demo: Live collaborative modifying — agent modifies UI in real-time[/]n")
preliminary = A2UISurface(
surface_id="task-board",
elements=[
A2UIComponent("board", "card", {"title": "
Sprint Board"}, children=["t1", "t2", "t3"]),
A2UIComponent("t1", "chip", {"label": "AUTH-101: Login circulation", "variant": "in_progress"}),
A2UIComponent("t2", "chip", {"label": "AUTH-102: OAuth setup", "variant": "todo"}),
A2UIComponent("t3", "chip", {"label": "AUTH-103: 2FA", "variant": "todo"}),
],
data_model=A2UIDataModel({"dash": {"identify": "Sprint 14", "velocity": 21}}),
)
reside = LiveSurface(preliminary)
console.print("[bold]Initial board:[/]")
registry.render_tree(reside.floor)
console.print("n[bold yellow]Agent updating board in real-time...[/]n")
reside.update_component("t1", {"variant": "carried out", "label": "
AUTH-101: Login circulation"})
reside.update_component("t2", {"variant": "in_progress", "label": "
AUTH-102: OAuth setup"})
reside.add_component(
A2UIComponent("t4", "chip", {"label": "AUTH-104: Password reset", "variant": "todo"}),
parent_id="board"
)
reside.update_data("/dash/velocity", 25)
reside.remove_component("t3")
console.print("[bold]Updated board:[/]")
registry.render_tree(reside.floor)
console.print()
t = Table(title="Incremental Update Log", field=field.ROUNDED)
t.add_column("#", type="cyan", width=4, justify="heart")
t.add_column("Operation", type="yellow")
for i, entry in enumerate(reside.update_log, 1):
t.add_row(str(i), entry)
console.print(t)
We wire each piece collectively into a single AgenticUIPipeline class that takes a consumer question, classifies its intent with an LLM router, selects the best UI sample, generates an A2UI floor, streams your entire course of over AG-UI occasions, manages shared state, and renders the consequence, the whole structure in a single run. We then construct a LiveSurface class that helps incremental A2UI updates: including, modifying, and eradicating elements on an already-rendered floor with out regenerating the entire tree, which is important for real-time collaborative experiences. We demo this with a dash board that an agent updates reside, marking duties full, including new ones, and adjusting knowledge mannequin values, all tracked in an in depth operation log.
hdr(8, "Reference — The Agentic UI Protocol Stack",
"How AG-UI, A2UI, MCP, and A2A match collectively within the trendy agent structure.")
console.print(Panel("""
[bold white]THE AGENTIC UI STACK (2026)[/]
┌──────────────────────────────────────────────────────┐
│ [bold cyan]USER INTERFACE[/] (React, Flutter, SwiftUI, Terminal) │
│ Renders native widgets from part specs │
└────────────────────────┬─────────────────────────────┘
│ A2UI part bushes (JSON)
│ AG-UI occasions (SSE / WebSocket)
┌────────────────────────┴─────────────────────────────┐
│ [bold yellow]AG-UI PROTOCOL[/] (Agent
User Interaction) │
│ • Event streaming (TEXT, TOOL_CALL, STATE, INTERRUPT)│
│ • Bidirectional state sync (SNAPSHOT + DELTA) │
│ • Human-in-the-loop (INTERRUPT → approval circulation) │
└────────────────────────┬─────────────────────────────┘
│
┌────────────────────────┴─────────────────────────────┐
│ [bold magenta]AGENT RUNTIME[/] (LangGraph, CrewAI, customized, and so forth.) │
│ • Generates A2UI surfaces (Generative UI) │
│ • Manages shared state │
│ • Orchestrates sub-agents by way of A2A protocol │
│ • Accesses instruments by way of MCP protocol │
└────────────────────────┬─────────────────────────────┘
│
┌────────────────────────┴─────────────────────────────┐
│ [bold green]LLM BACKBONE[/] (GPT, Claude, Gemini, and so forth.) │
│ • Generates part bushes as structured output │
│ • Reasons about UI patterns per context │
│ • Streams tokens for real-time rendering │
└──────────────────────────────────────────────────────┘
[dim]Protocol roles:
AG-UI = Agent
User (streaming occasions, state, HITL)
A2UI = Agent → UI (declarative part specs)
A2A = Agent → Agent (delegation, sub-agents)
MCP = Agent → Tools (perform calling, context)[/]
""", title="[bold]Architecture Reference", border_style="cyan"))
ref = Table(title="Agentic UI Concepts — Quick Reference", field=field.DOUBLE_EDGE, show_lines=True)
ref.add_column("Concept", type="daring cyan", width=22)
ref.add_column("What It Does", type="white", width=35)
ref.add_column("Key Mechanism", type="yellow", width=28)
ref.add_row("AG-UI Events", "Stream agent actions to frontend in real-time", "SSE/WebSocket + ~16 occasion varieties")
ref.add_row("A2UI Components", "Declarative UI bushes — secure, moveable, native", "Flat JSON + widget registry")
ref.add_row("State Sync", "Keep agent & UI state in lockstep", "STATE_SNAPSHOT + STATE_DELTA")
ref.add_row("Generative UI", "LLM generates UI at runtime, not simply textual content", "A2UI JSON as structured output")
ref.add_row("INTERRUPT (HITL)", "Pause execution for human approval", "INTERRUPT occasion → approval circulation")
ref.add_row("Incremental Update","Modify reside surfaces with out full regeneration", "A2UI replaceComponents message")
ref.add_row("Data Binding", "UI reads from a shared knowledge mannequin", "JSON Pointer paths (/path/to/val)")
ref.add_row("Widget Registry", "Client maps summary varieties to native widgets", "Catalog of trusted elements")
console.print(ref)
console.print(Panel(
"[bold green]Tutorial full![/]nn"
"[dim]What you constructed:[/]n"
" • A full AG-UI occasion system with all 16 occasion typesn"
" • An A2UI renderer with flat adjacency-list elements + knowledge bindingn"
" • LLM-powered Generative UI that creates interfaces from pure languagen"
" • Bidirectional state sync with JSON Patch deltasn"
" • Human-in-the-loop interrupt and approval flowsn"
" • Incremental reside floor updatesnn"
"[dim]To go additional:[/]n"
" • Serve AG-UI occasions over actual SSE with FastAPIn"
" • Connect to CopilotKit React elements for an actual frontendn"
" • Use Pydantic AI's AGUIAdapter for manufacturing agent hostingn"
" • Add A2A protocol for multi-agent delegationn"
" • Deploy on AWS Bedrock AgentCore with native AG-UI assist",
title="[bold]
What's Next?",
border_style="inexperienced",
padding=(1, 2),
))
We shut with a visible protocol stack diagram exhibiting precisely how AG-UI, A2UI, A2A, and MCP match collectively within the trendy agentic structure, from the LLM spine on the backside to the native UI on the high. We present a quick-reference desk mapping each idea we constructed, occasion streaming, part bushes, state sync, generative UI, interrupts, incremental updates, knowledge binding, and widget registries, to their core mechanisms. We level the way in which ahead to manufacturing: serving AG-UI occasions over actual SSE with FastAPI, connecting to CopilotKit React elements, utilizing Pydantic AI’s AGUIAdapter, and deploying on AWS Bedrock AgentCore.
In conclusion, we’ve got a totally purposeful Agentic UI pipeline that takes a easy natural-language question and transforms it into a structured, interactive interface powered by an clever agent. We don’t simply assemble elements; we perceive how every layer operates and connects, from real-time AG-UI occasion streaming and declarative A2UI interface definitions to state synchronization by means of JSON Patch and enforced human-in-the-loop security mechanisms. This readability permits us to purpose about system conduct, debug successfully, and lengthen performance with out counting on black-box abstractions. Also, we go away with the power to design our personal agent-driven UI programs, adapt them to completely different use circumstances, and confidently construct production-ready experiences the place brokers and interfaces evolve collectively in a managed, clear, and scalable method.
Check out the Full Codes with Notebook here. Also, be happy to comply with us on Twitter and don’t overlook to affix our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to companion with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us
The put up A Coding Deep Dive into Agentic UI, Generative UI, State Synchronization, and Interrupt-Driven Approval Flows appeared first on MarkTechPost.
