How to Build a Fully Offline Multi-Tool Reasoning Agent with Dynamic Planning, Error Recovery, and Intelligent Function Routing
In this tutorial, we discover how to construct a totally offline, multi-step reasoning agent that makes use of the Instructor library to generate structured outputs and reliably orchestrate complicated instrument calls. In this implementation, we design an agent able to selecting the best instrument, validating inputs, planning multi-stage workflows, and recovering from errors. We convey collectively Instructor, Transformers, and rigorously crafted Pydantic schemas to create an clever, adaptive system that mirrors real-world agentic AI habits. Check out the FULL CODES here.
import subprocess
import sys
def install_dependencies():
import torch
packages = [
"instructor",
"transformers>=4.35.0",
"torch",
"accelerate",
"pydantic>=2.0.0",
"numpy",
"pandas"
]
if torch.cuda.is_available():
packages.append("bitsandbytes")
print("
GPU detected - putting in quantization help")
else:
print("
No GPU detected - will use CPU (slower however works)")
for bundle in packages:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
attempt:
import teacher
besides ImportError:
print("
Installing dependencies...")
install_dependencies()
print("
Installation full!")
from typing import Literal, Optional, List, Union, Dict, Any
from pydantic import BaseModel, Field, validator
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import teacher
import json
from datetime import datetime
import re
We arrange the environment by putting in all required dependencies and importing the core libraries. As we lay the muse for the system, we be sure that the whole lot, from the Instructor to the Transformers, is prepared for offline execution. This lets us begin with a clear and dependable base for constructing the agent. Check out the FULL CODES here.
class SQLQuery(BaseModel):
"""Complex SQL era with validation"""
desk: str
columns: List[str]
where_conditions: Optional[Dict[str, Any]] = None
joins: Optional[List[Dict[str, str]]] = None
aggregations: Optional[Dict[str, str]] = None
order_by: Optional[List[str]] = None
@validator('columns')
def validate_columns(cls, v):
if not v:
increase WorthError("Must specify not less than one column")
return v
class DataTransformation(BaseModel):
"""Schema for complicated knowledge pipeline operations"""
operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
source_data: str = Field(description="Reference to knowledge supply")
parameters: Dict[str, Any]
output_format: Literal["json", "csv", "dataframe"]
class APIRequest(BaseModel):
"""Multi-endpoint API orchestration"""
endpoints: List[Dict[str, str]] = Field(description="List of endpoints to name")
authentication: Dict[str, str]
request_order: Literal["sequential", "parallel", "conditional"]
error_handling: Literal["stop", "continue", "retry"]
max_retries: int = Field(default=3, ge=0, le=10)
class CodeEra(BaseModel):
"""Generate and validate code snippets"""
language: Literal["python", "javascript", "sql", "bash"]
objective: str
code: str = Field(description="The generated code")
dependencies: List[str] = Field(default_factory=checklist)
test_cases: List[Dict[str, Any]] = Field(default_factory=checklist)
@validator('code')
def validate_code_safety(cls, v, values):
harmful = ['eval(', 'exec(', '__import__', 'os.system']
if values.get('language') == 'python':
if any(d in v for d in harmful):
increase WorthError("Code comprises doubtlessly harmful operations")
return v
class MultiToolPlan(BaseModel):
"""Plan for multi-step instrument execution"""
purpose: str
steps: List[Dict[str, Any]] = Field(description="Ordered checklist of instrument calls")
dependencies: Dict[str, List[str]] = Field(description="Step dependencies")
fallback_strategy: Optional[str] = None
estimated_duration: float = Field(description="Seconds")
class ToolCall(BaseModel):
"""Enhanced instrument choice with context"""
reasoning: str
confidence: float = Field(ge=0.0, le=1.0)
tool_name: Literal["sql_engine", "data_transformer", "api_orchestrator",
"code_generator", "planner", "none"]
tool_input: Optional[Union[SQLQuery, DataTransformation, APIRequest,
CodeGeneration, MultiToolPlan]] = None
requires_human_approval: bool = False
class ExecutionEnd result(BaseModel):
"""Rich consequence with metadata"""
success: bool
knowledge: Any
execution_time: float
warnings: List[str] = Field(default_factory=checklist)
metadata: Dict[str, Any] = Field(default_factory=dict)
We outline all of the superior Pydantic schemas that construction how our agent understands SQL queries, knowledge pipelines, API calls, code era, and multi-step plans. As we construct these fashions, we give our agent sturdy validation, security, and readability in deciphering complicated directions. This turns into the spine of our agent’s reasoning course of. Check out the FULL CODES here.
def sql_engine_tool(params: SQLQuery) -> ExecutionEnd result:
import time
begin = time.time()
mock_tables = {
"customers": [
{"id": 1, "name": "Alice", "age": 30, "country": "USA"},
{"id": 2, "name": "Bob", "age": 25, "country": "UK"},
{"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
],
"orders": [
{"id": 1, "user_id": 1, "amount": 100, "status": "completed"},
{"id": 2, "user_id": 1, "amount": 200, "status": "pending"},
{"id": 3, "user_id": 2, "amount": 150, "status": "completed"},
]
}
knowledge = mock_tables.get(params.desk, [])
if params.where_conditions:
knowledge = [row for row in data if all(
row.get(k) == v for k, v in params.where_conditions.items()
)]
knowledge = [{col: row.get(col) for col in params.columns} for row in data]
warnings = []
if params.aggregations:
warnings.append("Aggregation simplified in mock mode")
return ExecutionEnd result(
success=True,
knowledge=knowledge,
execution_time=time.time() - begin,
warnings=warnings,
metadata={"rows_affected": len(knowledge), "query_type": "SELECT"}
)
def data_transformer_tool(params: DataTransformation) -> ExecutionEnd result:
import time
begin = time.time()
operations = {
"filter": lambda d, p: [x for x in d if x.get(p['field']) == p['value']],
"mixture": lambda d, p: {"rely": len(d), "operation": p.get('operate', 'rely')},
"normalize": lambda d, p: [{k: v/p.get('factor', 1) for k, v in x.items()} for x in d]
}
mock_data = [{"value": i, "category": "A" if i % 2 else "B"} for i in range(10)]
op_func = operations.get(params.operation)
if op_func:
result_data = op_func(mock_data, params.parameters)
else:
result_data = mock_data
return ExecutionEnd result(
success=True,
knowledge=result_data,
execution_time=time.time() - begin,
warnings=[],
metadata={"operation": params.operation, "input_rows": len(mock_data)}
)
def api_orchestrator_tool(params: APIRequest) -> ExecutionEnd result:
import time
begin = time.time()
outcomes = []
warnings = []
for i, endpoint in enumerate(params.endpoints):
if params.error_handling == "retry" and i == 1:
warnings.append(f"Endpoint {endpoint.get('url')} failed, retrying...")
outcomes.append({
"endpoint": endpoint.get('url'),
"standing": 200,
"knowledge": f"Mock response from {endpoint.get('url')}"
})
return ExecutionEnd result(
success=True,
knowledge=outcomes,
execution_time=time.time() - begin,
warnings=warnings,
metadata={"endpoints_called": len(params.endpoints), "order": params.request_order}
)
def code_generator_tool(params: CodeEra) -> ExecutionEnd result:
import time
begin = time.time()
warnings = []
if len(params.code) > 1000:
warnings.append("Generated code is kind of lengthy, take into account refactoring")
if not params.test_cases:
warnings.append("No check circumstances supplied for generated code")
return ExecutionEnd result(
success=True,
knowledge={"code": params.code, "language": params.language, "dependencies": params.dependencies},
execution_time=time.time() - begin,
warnings=warnings,
metadata={"lines_of_code": len(params.code.break up('n'))}
)
def planner_tool(params: MultiToolPlan) -> ExecutionEnd result:
import time
begin = time.time()
warnings = []
if len(params.steps) > 10:
warnings.append("Plan has many steps, take into account breaking into sub-plans")
for step_id, deps in params.dependencies.gadgets():
if step_id in deps:
warnings.append(f"Circular dependency detected in step {step_id}")
return ExecutionEnd result(
success=True,
knowledge={"plan": params.steps, "estimated_time": params.estimated_duration},
execution_time=time.time() - begin,
warnings=warnings,
metadata={"total_steps": len(params.steps)}
)
TOOLS = {
"sql_engine": sql_engine_tool,
"data_transformer": data_transformer_tool,
"api_orchestrator": api_orchestrator_tool,
"code_generator": code_generator_tool,
"planner": planner_tool
}
We implement the precise instruments, SQL execution, knowledge transformation, API orchestration, code validation, and planning. As we write these instrument capabilities, we simulate practical workflows with managed outputs and error dealing with. This permits us to check the agent’s decision-making in an setting that mirrors real-world duties. Check out the FULL CODES here.
class AdvancedToolAgent:
"""Agent with complicated reasoning, error restoration, and multi-step planning"""
def __init__(self, model_name: str = "HuggingFaceH4/zephyr-7b-beta"):
import torch
print(f"
Loading mannequin: {model_name}")
model_kwargs = {"device_map": "auto"}
if torch.cuda.is_available():
print("
GPU detected - utilizing 8-bit quantization")
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)
model_kwargs["quantization_config"] = quantization_config
else:
print("
CPU mode - utilizing smaller mannequin for higher efficiency")
model_name = "google/flan-t5-base"
model_kwargs["torch_dtype"] = "auto"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.mannequin = AutoModelForCausalLM.from_pretrained(
model_name,
**model_kwargs
)
self.pipe = pipeline(
"text-generation", mannequin=self.mannequin, tokenizer=self.tokenizer,
max_new_tokens=768, temperature=0.7, do_sample=True
)
self.shopper = teacher.from_pipe(self.pipe)
self.execution_history = []
print("
Agent initialized!")
def route_to_tool(self, user_query: str, context: Optional[str] = None) -> ToolCall:
tool_descriptions = """
Advanced Tools:
- sql_engine: Execute complicated SQL queries with joins, aggregations, filtering
- data_transformer: Multi-step knowledge pipelines (filter→mixture→normalize)
- api_orchestrator: Call a number of APIs with dependencies, retries, error dealing with
- code_generator: Generate protected, validated code with assessments in a number of languages
- planner: Create multi-step execution plans with dependency administration
- none: Answer instantly utilizing reasoning
"""
immediate = f"""{tool_descriptions}
User question: {user_query}
{f'Context from earlier steps: {context}' if context else ''}
Analyze the complexity and select the suitable instrument. For multi-step duties, use the planner."""
return self.shopper(immediate, response_model=ToolCall)
def execute_with_recovery(self, tool_call: ToolCall, max_retries: int = 2) -> ExecutionEnd result:
for try in vary(max_retries + 1):
attempt:
if tool_call.tool_name == "none":
return ExecutionEnd result(
success=True, knowledge="Direct response", execution_time=0.0,
warnings=[], metadata={}
)
tool_func = TOOLS.get(tool_call.tool_name)
if not tool_func:
return ExecutionEnd result(
success=False, knowledge=None, execution_time=0.0,
warnings=[f"Tool {tool_call.tool_name} not found"], metadata={}
)
consequence = tool_func(tool_call.tool_input)
self.execution_history.append({
"instrument": tool_call.tool_name,
"success": consequence.success,
"timestamp": datetime.now().isoformat()
})
return consequence
besides Exception as e:
if try < max_retries:
print(f"
Attempt {try + 1} failed, retrying...")
proceed
return ExecutionEnd result(
success=False, knowledge=None, execution_time=0.0,
warnings=[f"Failed after {max_retries + 1} attempts: {str(e)}"],
metadata={"error": str(e)}
)
We assemble the agent itself, loading the mannequin, constructing the routing pipeline, and implementing restoration logic. As we outline strategies for instrument choice and execution, we give the agent the power to perceive queries, select methods, and gracefully deal with failures. Check out the FULL CODES here.
def run(self, user_query: str, verbose: bool = True) -> Dict[str, Any]:
if verbose:
print(f"n{'='*70}")
print(f"
Complex Query: {user_query}")
print(f"{'='*70}")
if verbose:
print("n
Step 1: Analyzing question complexity & routing...")
tool_call = self.route_to_tool(user_query)
if verbose:
print(f" → Tool: {tool_call.tool_name}")
print(f" → Confidence: {tool_call.confidence:.2%}")
print(f" → Reasoning: {tool_call.reasoning}")
if tool_call.requires_human_approval:
print(f"
Requires human approval!")
if verbose:
print("n
Step 2: Executing instrument with error restoration...")
consequence = self.execute_with_recovery(tool_call)
if verbose:
print(f" → Success: {consequence.success}")
print(f" → Execution time: {consequence.execution_time:.3f}s")
if consequence.warnings:
print(f" → Warnings: {', '.be a part of(consequence.warnings)}")
print(f" → Data preview: {str(consequence.knowledge)[:200]}...")
if verbose and consequence.metadata:
print(f"n
Metadata:")
for key, worth in consequence.metadata.gadgets():
print(f" • {key}: {worth}")
if verbose:
print(f"n{'='*70}n")
return {
"question": user_query,
"tool_used": tool_call.tool_name,
"consequence": consequence,
"history_length": len(self.execution_history)
}
def essential():
agent = AdvancedToolAgent()
hard_queries = [
"Generate a SQL query to find all users from USA who have completed orders worth more than $150, and join with their order details",
"Create a data pipeline that filters records where category='A', then aggregates by count, and normalizes the results by a factor of 100",
"I need to call 3 APIs sequentially: first authenticate at /auth, then fetch user data at /users/{id}, and finally update preferences at /preferences. If any step fails, retry up to 3 times",
"Write a Python function that validates email addresses using regex, includes error handling, and has at least 2 test cases. Make sure it doesn't use any dangerous operations",
"Create a multi-step plan to: 1) Extract data from a database, 2) Transform it using pandas, 3) Generate a report, 4) Send via email. Show dependencies between steps"
]
print("n" + "
HARD MODE: COMPLEX QUERIES ".middle(70, "=") + "n")
for i, question in enumerate(hard_queries, 1):
print(f"n{'#'*70}")
print(f"# CHALLENGE {i}/{len(hard_queries)}")
print(f"{'#'*70}")
attempt:
agent.run(question, verbose=True)
besides Exception as e:
print(f"
Critical error: {e}n")
print("n" + f"
COMPLETED {len(agent.execution_history)} TOOL EXECUTIONS ".middle(70, "=") + "n")
print(f"
Success fee: {sum(1 for h in agent.execution_history if h['success']) / len(agent.execution_history) * 100:.1f}%")
if __name__ == "__main__":
essential()
We tie the whole lot collectively with a run() methodology and a demo essential() operate that executes a number of hard-mode queries. As we watch the agent analyze, route, execute, and report outcomes, we see the complete energy of the structure in motion. This closing step lets us expertise how the system performs beneath complicated, practical situations.
In conclusion, now we have constructed a highly effective agent able to understanding intricate directions, routing execution throughout a number of instruments, and gracefully recovering from errors, all inside a compact, offline system. As we check it on difficult queries, we watch it plan, motive, and execute with readability and construction. We now recognize how modular schemas, validated instrument calls, and layered execution logic permit us to create brokers that behave reliably in complicated environments.
Check out the FULL CODES here. Feel free to try our GitHub Page for Tutorials, Codes and Notebooks. Also, be happy to comply with us on Twitter and don’t neglect to be a part of our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The put up How to Build a Fully Offline Multi-Tool Reasoning Agent with Dynamic Planning, Error Recovery, and Intelligent Function Routing appeared first on MarkTechPost.
