How to Build a Document Intelligence Backend with iii Using Workers, Functions, and Cron Triggers
In this tutorial, we construct a document-intelligence workflow with iii. We start by putting in the iii engine and Python SDK, then begin the engine as a background course of and join a Python employee to it. After the setup, we register separate features for textual content normalization, tokenization, sentiment evaluation, key phrase extraction, reporting, and heartbeat monitoring. We then mix these features into a single evaluation pipeline and run the identical logic through direct invocation, an HTTP endpoint, fire-and-forget execution, and a scheduled cron set off. Along the best way, we additionally observe fundamental runtime state, making the workflow really feel nearer to a actual backend system than a static pocket book demo. Check out the FULL CODES here.
import os, sys, subprocess, time, socket, json, threading
from collections import Counter
HOME = os.path.expanduser("~")
BIN_DIR = f"{HOME}/.native/bin"
os.environ["PATH"] = BIN_DIR + os.pathsep + os.environ.get("PATH", "")
def sh(cmd):
print(f"$ {cmd}")
subprocess.run(cmd, shell=True, verify=True)
if not os.path.exists(f"{BIN_DIR}/iii"):
sh(f"curl -fsSL https://set up.iii.dev/iii/important/set up.sh | BIN_DIR={BIN_DIR} sh")
sh(f"{sys.executable} -m pip set up -q iii-sdk requests")
III = f"{BIN_DIR}/iii"
sh(f"{III} --version")
We begin by importing the required Python modules and establishing the native binary path for the III engine. We outline a small helper operate to run shell instructions and set up the III engine if it isn’t already out there. We additionally set up the Python SDK and requests bundle, then confirm the iii set up by checking its model.
WS_URL, HTTP_URL = "ws://localhost:49134", "http://localhost:3111"
engine_log = open("/tmp/iii-engine.log", "w")
engine = subprocess.Popen([III, "--use-default-config"],
stdout=engine_log, stderr=subprocess.STDOUT)
def wait_port(host, port, timeout=90):
finish = time.time() + timeout
whereas time.time() < finish:
with socket.socket() as s:
s.settimeout(1)
strive:
s.join((host, port)); return True
besides OSError:
time.sleep(0.5)
return False
assert wait_port("localhost", 49134), "engine by no means got here up — see /tmp/iii-engine.log"
print(f"✓ engine up — WS {WS_URL} | HTTP {HTTP_URL}")
from iii import register_worker
strive:
from iii import TriggerAction
besides Exception:
TriggerAction = None
employee = register_worker(WS_URL)
_STATE = {"docs_analyzed": 0, "heartbeats": 0, "keyword_totals": Counter()}
_LOCK = threading.Lock()
POSITIVE = {"good","nice","love","glorious","completely happy","quick","dependable","wonderful","finest","win"}
NEGATIVE = {"unhealthy","horrible","hate","sluggish","damaged","unhappy","worst","bug","crash","fail"}
We launch the iii engine as a background course of and look forward to its WebSocket port to change into out there. We then join a Python employee to the working engine and put together optionally available assist for fire-and-forget triggers. We additionally outline a shared in-memory state, a thread lock, and easy optimistic and adverse phrase units for sentiment evaluation.
def normalize(knowledge):
return {"textual content": (knowledge.get("textual content") or "").strip().decrease()}
def tokenize(knowledge):
textual content = knowledge.get("textual content", "")
cleaned = "".be a part of(c if (c.isalnum() or c.isspace()) else " " for c in textual content)
tokens = [t for t in cleaned.split() if t]
return {"tokens": tokens, "rely": len(tokens)}
def sentiment(knowledge):
toks = knowledge.get("tokens", [])
pos = sum(t in POSITIVE for t in toks)
neg = sum(t in NEGATIVE for t in toks)
rating = pos - neg
label = "optimistic" if rating > 0 else "adverse" if rating < 0 else "impartial"
return {"label": label, "rating": rating, "pos": pos, "neg": neg}
def key phrases(knowledge):
toks = knowledge.get("tokens", [])
cease = {"the","a","an","is","it","to","of","and","in","for","on","how"}
freq = Counter(t for t in toks if t not in cease and len(t) > 2)
return {"key phrases": freq.most_common(knowledge.get("top_n", 5))}
def analyze(knowledge):
norm = employee.set off({"function_id": "textual content::normalize", "payload": {"textual content": knowledge.get("textual content","")}})
toks = employee.set off({"function_id": "textual content::tokenize", "payload": norm})
despatched = employee.set off({"function_id": "textual content::sentiment", "payload": toks})
keys = employee.set off({"function_id": "textual content::key phrases", "payload": {**toks, "top_n": knowledge.get("top_n", 5)}})
with _LOCK:
_STATE["docs_analyzed"] += 1
for ok, c in keys["keywords"]:
_STATE["keyword_totals"][k] += c
n = _STATE["docs_analyzed"]
return {"tokens": toks["count"], "sentiment": despatched, "key phrases": keys["keywords"], "docs_analyzed": n}
def report(knowledge):
with _LOCK:
return {"docs_analyzed": _STATE["docs_analyzed"],
"heartbeats": _STATE["heartbeats"],
"top_keywords_all_docs": _STATE["keyword_totals"].most_common(5)}
def http_analyze(knowledge):
physique = knowledge.get("physique") or {}
end result = employee.set off({"function_id": "pipeline::analyze", "payload": physique})
return {"status_code": 200, "physique": end result, "headers": {"Content-Type": "utility/json"}}
def heartbeat(knowledge):
with _LOCK:
_STATE["heartbeats"] += 1
return {"okay": True}
for fid, fn in [
("text::normalize", normalize), ("text::tokenize", tokenize),
("text::sentiment", sentiment), ("text::keywords", keywords),
("pipeline::analyze", analyze), ("stats::report", report),
("http::analyze", http_analyze), ("cron::heartbeat", heartbeat),
]:
employee.register_function(fid, fn)
We outline the core features used within the text-analysis workflow, together with normalization, tokenization, sentiment detection, and key phrase extraction. We then create an evaluation operate that routes every step by way of the III engine as a substitute of calling the whole lot immediately. We additionally add reporting, HTTP dealing with, and heartbeat features earlier than registering all of them with the employee.
employee.register_trigger({"sort": "http", "function_id": "http::analyze",
"config": {"api_path": "/analyze", "http_method": "POST"}})
cron_ok = False
strive:
employee.register_trigger({"sort": "cron", "function_id": "cron::heartbeat",
"config": {"schedule": "*/2 * * * * *"}})
cron_ok = True
besides Exception as e:
print("cron set off skipped:", e)
strive:
employee.join()
besides Exception:
move
time.sleep(2)
We register an HTTP set off in order that the evaluation pipeline may be invoked through a POST request. We additionally strive to register a cron set off that runs the heartbeat operate on a fastened schedule, whereas safely skipping it if the engine construct doesn’t assist that schema. We then join the employee and pause briefly so the registered features and triggers are prepared to use.
print("n=== A) Direct invocation — orchestrated by way of the engine ===")
docs = [
"iii makes the backend amazing and fast, I love how reliable it is",
"The legacy gateway was slow and broken, a terrible buggy experience",
"Workers register functions and triggers; the engine routes every call",
]
for d in docs:
r = employee.set off({"function_id": "pipeline::analyze", "payload": {"textual content": d, "top_n": 4}})
print(f" [{r['sentiment']['label']:>8}] tokens={r['tokens']:>2} key phrases={r['keywords']}")
print("n=== B) The SAME operate over HTTP (:3111) — zero handler adjustments ===")
import requests
strive:
resp = requests.put up(f"{HTTP_URL}/analyze",
json={"textual content": "nice nice product, finest ever", "top_n": 3}, timeout=10)
print(" HTTP", resp.status_code, "->", resp.json())
besides Exception as e:
print(" HTTP name failed (engine HTTP module/model?):", e)
print("n=== C) Fire-and-forget invocation ===")
if TriggerAction:
employee.set off({"function_id": "pipeline::analyze",
"payload": {"textual content": "async win, no ready"},
"motion": TriggerAction.Void()})
print(" dispatched (no end result awaited)")
else:
print(" TriggerAction not on this SDK construct — skipping")
print("n=== D) Cron set off firing by itself ===")
if cron_ok:
time.sleep(5)
print(" heartbeats to this point:",
employee.set off({"function_id": "stats::report", "payload": {}})["heartbeats"])
else:
print(" cron not registered on this engine construct")
print("n=== E) Aggregate state report ===")
print(json.dumps(employee.set off({"function_id": "stats::report", "payload": {}}), indent=2))
print("nTraces/metrics: run `iii console` domestically, or scrape Prometheus at :9464")
print("engine log tail:")
print(subprocess.run(["tail", "-n", "8", "/tmp/iii-engine.log"],
capture_output=True, textual content=True).stdout)
We take a look at the whole III workflow by sending pattern textual content paperwork by way of the registered evaluation pipeline. We then name the identical logic by way of HTTP, strive fire-and-forget execution, and verify whether or not the cron heartbeat is working. Finally, we print the mixture state report and present the engine log tail for fundamental runtime visibility.
In conclusion, we’ve got a working III system that processes textual content utilizing modular, registered features moderately than a single fastened script. We analyzed pattern paperwork, uncovered the pipeline by way of HTTP, examined async-style execution, tracked heartbeat exercise, and printed an combination state report. The tutorial retains the instance readable whereas displaying the principle working sample of iii: outline features as soon as, register them with a employee, and reuse them by way of completely different triggers and execution paths. It additionally exhibits how small features may be cleanly linked because the workflow grows into one thing extra production-ready.
Check out the FULL CODES here. Also, be at liberty to comply with us on Twitter and don’t overlook to be a part of our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to companion with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and many others.? Connect with us
The put up How to Build a Document Intelligence Backend with iii Using Workers, Functions, and Cron Triggers appeared first on MarkTechPost.
