How to Design an Autonomous Multi-Agent Data and Infrastructure Strategy System Using Lightweight Qwen Models for Efficient Pipeline Intelligence?
In this tutorial, we construct an Agentic Data and Infrastructure Strategy system utilizing the light-weight Qwen2.5-0.5B-Instruct mannequin for environment friendly execution. We start by creating a versatile LLM agent framework and then develop specialised brokers that deal with totally different layers of knowledge administration, from ingestion and high quality evaluation to infrastructure optimization. We combine these brokers into an orchestrator that coordinates their interactions, making certain clean multi-agent collaboration throughout the info pipeline. Through hands-on examples like e-commerce and IoT pipelines, we discover how autonomous decision-making can streamline advanced knowledge operations. Check out the FULL CODES here.
!pip set up -q transformers torch speed up datasets huggingface_hub
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json, time
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
class LightweightLLMAgent:
def __init__(self, function: str, model_name: str = "Qwen/Qwen2.5-0.5B-Instruct"):
self.function = function
self.model_name = model_name
self.gadget = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Loading {model_name} for {function} agent on {self.gadget}...")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.mannequin = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.gadget == "cuda" else torch.float32,
device_map="auto"
)
self.conversation_history = []
def generate_response(self, immediate: str, max_tokens: int = 150) -> str:
messages = [
{"role": "system", "content": f"You are a {self.role} agent in a data infrastructure system."},
{"role": "user", "content": prompt}
]
textual content = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.gadget)
with torch.no_grad():
generated_ids = self.mannequin.generate(
model_inputs.input_ids,
max_new_tokens=max_tokens,
temperature=0.7,
do_sample=True,
top_p=0.95
)
generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
self.conversation_history.append({"immediate": immediate, "response": response})
return response
We begin by establishing the light-weight LLM agent infrastructure utilizing the Qwen2.5-0.5B-Instruct mannequin. We load the mannequin and tokenizer, and outline a base agent class able to dealing with contextual conversations and producing clever responses. This varieties the core basis upon which our specialised brokers function effectively inside Colab. Check out the FULL CODES here.
class DataIngestionAgent(LightweightLLMAgent):
def __init__(self):
tremendous().__init__(function="Data Ingestion Specialist")
def analyze_data_source(self, source_info: Dict) -> Dict:
immediate = f"""Analyze this knowledge supply and present ingestion technique:
Source Type: {source_info.get('kind', 'unknown')}
Volume: {source_info.get('quantity', 'unknown')}
Frequency: {source_info.get('frequency', 'unknown')}
Provide a quick technique specializing in: 1) Ingestion methodology, 2) Key concerns."""
technique = self.generate_response(immediate, max_tokens=100)
return {"supply": source_info, "technique": technique, "timestamp": datetime.now().isoformat()}
class DataQualityAgent(LightweightLLMAgent):
def __init__(self):
tremendous().__init__(function="Data Quality Analyst")
def assess_data_quality(self, data_sample: Dict) -> Dict:
immediate = f"""Assess knowledge high quality for this pattern:
Completeness: {data_sample.get('completeness', 'N/A')}%
Consistency: {data_sample.get('consistency', 'N/A')}%
Issues Found: {data_sample.get('points', 0)}
Provide transient high quality evaluation and prime 2 suggestions."""
evaluation = self.generate_response(immediate, max_tokens=100)
return {"evaluation": evaluation, "severity": self._calculate_severity(data_sample), "timestamp": datetime.now().isoformat()}
def _calculate_severity(self, data_sample: Dict) -> str:
completeness = data_sample.get('completeness', 100)
consistency = data_sample.get('consistency', 100)
avg_score = (completeness + consistency) / 2
if avg_score >= 90: return "LOW"
elif avg_score >= 70: return "MEDIUM"
else: return "HIGH"
We design the Data Ingestion and Data Quality brokers to give attention to structured evaluation of knowledge pipelines. We let the ingestion agent decide the most effective method to knowledge movement, whereas the standard agent evaluates knowledge completeness, consistency, and points to present actionable insights. Together, they set up the primary two layers of autonomous knowledge administration. Check out the FULL CODES here.
class InfrastructureOptimizationAgent(LightweightLLMAgent):
def __init__(self):
tremendous().__init__(function="Infrastructure Optimization Specialist")
def optimize_resources(self, metrics: Dict) -> Dict:
immediate = f"""Analyze infrastructure metrics and counsel optimizations:
CPU Usage: {metrics.get('cpu_usage', 0)}%
Memory Usage: {metrics.get('memory_usage', 0)}%
Storage: {metrics.get('storage_used', 0)}GB / {metrics.get('storage_total', 0)}GB
Query Latency: {metrics.get('query_latency', 0)}ms
Provide 2 optimization suggestions."""
suggestions = self.generate_response(immediate, max_tokens=100)
return {"current_metrics": metrics, "suggestions": suggestions, "precedence": self._calculate_priority(metrics), "timestamp": datetime.now().isoformat()}
def _calculate_priority(self, metrics: Dict) -> str:
cpu = metrics.get('cpu_usage', 0)
reminiscence = metrics.get('memory_usage', 0)
if cpu > 85 or reminiscence > 85: return "CRITICAL"
elif cpu > 70 or reminiscence > 70: return "HIGH"
else: return "NORMAL"
We develop the Infrastructure Optimization Agent to repeatedly analyze key metrics like CPU, reminiscence, and storage utilization. We use it to generate clever optimization solutions, serving to us preserve excessive efficiency and useful resource effectivity. This agent ensures that our infrastructure stays responsive and scalable throughout knowledge operations. Check out the FULL CODES here.
class AgenticDataOrchestrator:
def __init__(self):
print("n" + "="*70)
print("Initializing Agentic Data Infrastructure System")
print("="*70 + "n")
self.ingestion_agent = DataIngestionAgent()
self.quality_agent = DataQualityAgent()
self.optimization_agent = InfrastructureOptimizationAgent()
self.execution_log = []
def process_data_pipeline(self, pipeline_config: Dict) -> Dict:
outcomes = {"pipeline_id": pipeline_config.get("id", "unknown"), "start_time": datetime.now().isoformat(), "levels": []}
print("n[Stage 1] Data Ingestion Analysis")
ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get("supply", {}))
print(f"Strategy: {ingestion_result['strategy'][:150]}...")
outcomes["stages"].append({"stage": "ingestion", "outcome": ingestion_result})
print("n[Stage 2] Data Quality Assessment")
quality_result = self.quality_agent.assess_data_quality(pipeline_config.get("quality_metrics", {}))
print(f"Assessment: {quality_result['assessment'][:150]}...")
print(f"Severity: {quality_result['severity']}")
outcomes["stages"].append({"stage": "high quality", "outcome": quality_result})
print("n[Stage 3] Infrastructure Optimization")
optimization_result = self.optimization_agent.optimize_resources(pipeline_config.get("infrastructure_metrics", {}))
print(f"Recommendations: {optimization_result['recommendations'][:150]}...")
print(f"Priority: {optimization_result['priority']}")
outcomes["stages"].append({"stage": "optimization", "outcome": optimization_result})
outcomes["end_time"] = datetime.now().isoformat()
outcomes["status"] = "accomplished"
self.execution_log.append(outcomes)
return outcomes
def generate_summary_report(self) -> pd.DataBody:
if not self.execution_log: return pd.DataBody()
summary_data = []
for log in self.execution_log:
summary_data.append({"Pipeline ID": log["pipeline_id"], "Start Time": log["start_time"], "Status": log["status"], "Stages Completed": len(log["stages"])})
return pd.DataBody(summary_data)
We constructed an Agentic Data Orchestrator to coordinate all specialised brokers underneath a unified workflow. We use it to handle end-to-end pipeline execution, triggering ingestion, high quality checks, and optimization sequentially. By doing this, we convey construction, collaboration, and automation to the whole multi-agent system. Check out the FULL CODES here.
def major():
orchestrator = AgenticDataOrchestrator()
print("n" + "="*70)
print("EXAMPLE 1: E-commerce Data Pipeline")
print("="*70)
ecommerce_pipeline = {
"id": "ecommerce_pipeline_001",
"supply": {"kind": "REST API", "quantity": "10GB/day", "frequency": "real-time"},
"quality_metrics": {"completeness": 87, "consistency": 92, "points": 15},
"infrastructure_metrics": {"cpu_usage": 78, "memory_usage": 82, "storage_used": 450, "storage_total": 1000, "query_latency": 250}
}
result1 = orchestrator.process_data_pipeline(ecommerce_pipeline)
print("nn" + "="*70)
print("EXAMPLE 2: IoT Sensor Data Pipeline")
print("="*70)
iot_pipeline = {
"id": "iot_pipeline_002",
"supply": {"kind": "Message Queue (Kafka)", "quantity": "50GB/day", "frequency": "streaming"},
"quality_metrics": {"completeness": 95, "consistency": 88, "points": 8},
"infrastructure_metrics": {"cpu_usage": 65, "memory_usage": 71, "storage_used": 780, "storage_total": 2000, "query_latency": 180}
}
result2 = orchestrator.process_data_pipeline(iot_pipeline)
print("nn" + "="*70)
print("EXECUTION SUMMARY REPORT")
print("="*70 + "n")
summary_df = orchestrator.generate_summary_report()
print(summary_df.to_string(index=False))
print("n" + "="*70)
print("Tutorial Complete!")
print("="*70)
print("nKey Concepts Demonstrated:")
print("✓ Lightweight LLM agent structure")
print("✓ Specialized brokers for totally different knowledge duties")
print("✓ Multi-agent orchestration")
print("✓ Infrastructure monitoring and optimization")
print("✓ Autonomous decision-making in knowledge pipelines")
if __name__ == "__main__":
major()
We reveal our full system by means of two real-world examples, an e-commerce and an IoT knowledge pipeline. We observe how every agent performs its function autonomously whereas contributing to a shared goal. Finally, we generate a abstract report, confirming the orchestration’s effectivity and the ability of light-weight agentic intelligence.
In conclusion, we design and execute an clever, multi-agent knowledge infrastructure framework powered by a compact open-source mannequin. We witness how unbiased but cooperative brokers can autonomously analyze, assess, and optimize real-world knowledge methods. The whole setup demonstrates how light-weight LLMs can effectively deal with infrastructure intelligence, whereas additionally highlighting how agentic orchestration transforms conventional knowledge workflows into adaptive, self-optimizing methods prepared for scalable enterprise functions.
Check out the FULL CODES here. Feel free to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Also, be happy to observe us on Twitter and don’t neglect to be part of our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The submit How to Design an Autonomous Multi-Agent Data and Infrastructure Strategy System Using Lightweight Qwen Models for Efficient Pipeline Intelligence? appeared first on MarkTechPost.
