Google ADK Multi-Agent Pipeline Tutorial: Data Loading, Statistical Testing, Visualization, and Report Generation in Python
In this tutorial, we construct a complicated information evaluation pipeline utilizing Google ADK and set up it as a sensible multi-agent system for actual analytical work. We arrange the surroundings, configure safe API entry, create a centralized information retailer, and outline specialised instruments for loading information, exploring datasets, operating statistical checks, reworking tables, producing visualizations, and producing experiences. As we transfer via the workflow, we join these capabilities via a grasp analyst agent that coordinates specialists, permitting us to see how a production-style evaluation system can deal with end-to-end duties in a structured, scalable method.
!pip set up google-adk -q
!pip set up litellm -q
!pip set up pandas numpy scipy matplotlib seaborn -q
!pip set up openpyxl -q
print("
All packages put in!")
import os
import io
import json
import getpass
import asyncio
from datetime import datetime
from typing import Optional, Dict, Any, List
import pandas as pd
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
from google.adk.brokers import Agent
from google.adk.fashions.lite_llm import LiteLlm
from google.adk.classes import InMemorySessionService
from google.adk.runners import Runner
from google.adk.instruments.tool_context import ToolContext
from google.genai import sorts
import warnings
warnings.filterwarnings("ignore")
plt.type.use('seaborn-v0_8-whitegrid')
sns.set_palette("husl")
print("
Libraries loaded!")
def make_serializable(obj):
if isinstance(obj, dict):
return {ok: make_serializable(v) for ok, v in obj.gadgets()}
elif isinstance(obj, checklist):
return [make_serializable(item) for item in obj]
elif isinstance(obj, (np.integer, np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32)):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, (np.bool_,)):
return bool(obj)
elif isinstance(obj, pd.Timestamp):
return obj.isoformat()
elif pd.isna(obj):
return None
else:
return obj
print("
Serialization helper prepared!")
print("=" * 60)
print("
API KEY CONFIGURATION")
print("=" * 60)
strive:
from google.colab import userdata
api_key = userdata.get('OPENAI_API_KEY')
print("
API key loaded from Colab Secrets!")
besides:
print("n
Enter your OpenAI API key (hidden enter):")
api_key = getpass.getpass("OpenAI API Key: ")
os.environ['OPENAI_API_KEY'] = api_key
if api_key and len(api_key) > 20:
print(f"
API Key configured: {api_key[:8]}...{api_key[-4:]}")
else:
print("
Invalid API key!")
MODEL = "openai/gpt-4o-mini"
print(f"
Using mannequin: {MODEL}")
class DataRetailer:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = tremendous().__new__(cls)
cls._instance.datasets = {}
cls._instance.analysis_history = []
return cls._instance
def add_dataset(self, title: str, df: pd.DataBody, supply: str = "unknown"):
self.datasets[name] = {
"information": df,
"loaded_at": datetime.now().isoformat(),
"supply": supply,
"form": (int(df.form[0]), int(df.form[1])),
"columns": checklist(df.columns)
}
return f"Dataset '{title}' saved: {df.form[0]} rows × {df.form[1]} columns"
def get_dataset(self, title: str) -> Optional[pd.DataFrame]:
if title in self.datasets:
return self.datasets[name]["data"]
return None
def list_datasets(self) -> List[str]:
return checklist(self.datasets.keys())
def log_analysis(self, analysis_type: str, dataset: str, result_summary: str):
self.analysis_history.append({
"timestamp": datetime.now().isoformat(),
"sort": analysis_type,
"dataset": dataset,
"abstract": result_summary
})
DATA_STORE = DataRetailer()
print("
DataRetailer initialized!")
We set up the required libraries and import all of the modules wanted to construct the pipeline. We arrange the visualization type, configure the API key securely, and outline the mannequin that powers the brokers. We additionally create the shared DataRetailer and the serialization helper so we will handle datasets and return clear JSON-safe outputs all through the workflow.
def load_csv(file_path: str, dataset_name: str, tool_context: ToolContext) -> dict:
print(f"
Loading CSV: {file_path} as '{dataset_name}'")
strive:
df = pd.read_csv(file_path)
consequence = DATA_STORE.add_dataset(dataset_name, df, supply=file_path)
datasets = tool_context.state.get("loaded_datasets", [])
if dataset_name not in datasets:
datasets.append(dataset_name)
tool_context.state["loaded_datasets"] = datasets
tool_context.state["active_dataset"] = dataset_name
abstract = {
"standing": "success",
"message": consequence,
"preview": {
"columns": checklist(df.columns),
"form": [int(df.shape[0]), int(df.form[1])],
"dtypes": {ok: str(v) for ok, v in df.dtypes.gadgets()},
"pattern": make_serializable(df.head(3).to_dict(orient="data"))
}
}
return make_serializable(abstract)
besides Exception as e:
return {"standing": "error", "message": f"Failed to load CSV: {str(e)}"}
def create_sample_dataset(dataset_type: str, dataset_name: str, tool_context: ToolContext) -> dict:
print(f"
Creating pattern dataset: {dataset_type} as '{dataset_name}'")
np.random.seed(42)
if dataset_type == "gross sales":
n = 500
dates = pd.date_range("2023-01-01", durations=n, freq="D")
df = pd.DataBody({
"order_id": vary(1000, 1000 + n),
"date": dates[:n].astype(str),
"product": np.random.alternative(["Laptop", "Phone", "Tablet", "Watch", "Headphones"], n),
"class": np.random.alternative(["Electronics", "Accessories"], n, p=[0.6, 0.4]),
"area": np.random.alternative(["North", "South", "East", "West"], n),
"amount": np.random.randint(1, 10, n),
"unit_price": np.random.uniform(50, 1500, n).spherical(2),
"low cost": np.random.alternative([0.0, 0.05, 0.10, 0.15, 0.20], n),
"customer_type": np.random.alternative(["New", "Returning", "VIP"], n, p=[0.3, 0.5, 0.2])
})
df["revenue"] = (df["quantity"] * df["unit_price"] * (1 - df["discount"])).spherical(2)
df["profit_margin"] = np.random.uniform(0.15, 0.45, n).spherical(3)
df["profit"] = (df["revenue"] * df["profit_margin"]).spherical(2)
elif dataset_type == "prospects":
n = 300
df = pd.DataBody({
"customer_id": vary(5000, 5000 + n),
"age": np.random.randint(18, 75, n),
"gender": np.random.alternative(["M", "F", "Other"], n, p=[0.48, 0.48, 0.04]),
"revenue": np.random.lognormal(10.5, 0.5, n).spherical(0),
"training": np.random.alternative(["High School", "Bachelor", "Master", "PhD"], n, p=[0.25, 0.45, 0.22, 0.08]),
"membership_years": np.random.exponential(3, n).spherical(1),
"total_purchases": np.random.randint(1, 100, n),
"avg_order_value": np.random.uniform(25, 500, n).spherical(2),
"satisfaction_score": np.clip(np.random.regular(7.5, 1.5, n), 1, 10).spherical(1),
"churn_risk": np.random.alternative(["Low", "Medium", "High"], n, p=[0.6, 0.3, 0.1])
})
df["lifetime_value"] = (df["total_purchases"] * df["avg_order_value"]).spherical(2)
elif dataset_type == "timeseries":
dates = pd.date_range("2022-01-01", "2024-01-01", freq="D")
n = len(dates)
pattern = np.linspace(100, 200, n)
seasonal = 30 * np.sin(np.linspace(0, 6 * np.pi, n))
noise = np.random.regular(0, 10, n)
df = pd.DataBody({
"date": dates.astype(str),
"worth": (pattern + seasonal + noise).spherical(2),
"quantity": np.random.randint(1000, 10000, n),
"class": np.random.alternative(["A", "B", "C"], n)
})
elif dataset_type == "survey":
n = 200
df = pd.DataBody({
"respondent_id": vary(1, n + 1),
"age_group": np.random.alternative(["18-24", "25-34", "35-44", "45-54", "55+"], n),
"q1_satisfaction": np.random.randint(1, 6, n),
"q2_likelihood_recommend": np.random.randint(0, 11, n),
"q3_ease_of_use": np.random.randint(1, 6, n),
"q4_value_for_money": np.random.randint(1, 6, n),
"q5_support_quality": np.random.randint(1, 6, n),
"response_time_mins": np.random.exponential(10, n).spherical(1)
})
else:
return {"standing": "error", "message": f"Unknown dataset sort: {dataset_type}. Use: gross sales, prospects, timeseries, survey"}
consequence = DATA_STORE.add_dataset(dataset_name, df, supply=f"sample_{dataset_type}")
datasets = tool_context.state.get("loaded_datasets", [])
if dataset_name not in datasets:
datasets.append(dataset_name)
tool_context.state["loaded_datasets"] = datasets
tool_context.state["active_dataset"] = dataset_name
return make_serializable({
"standing": "success",
"message": consequence,
"description": f"Created pattern {dataset_type} dataset",
"columns": checklist(df.columns),
"form": [int(df.shape[0]), int(df.form[1])],
"pattern": df.head(3).to_dict(orient="data")
})
def list_available_datasets(tool_context: ToolContext) -> dict:
print("
Listing datasets")
datasets = DATA_STORE.list_datasets()
if not datasets:
return {"standing": "information", "message": "No datasets loaded. Use create_sample_dataset or load_csv."}
information = {}
for title in datasets:
ds = DATA_STORE.datasets[name]
information[name] = {
"rows": int(ds["shape"][0]),
"columns": int(ds["shape"][1]),
"column_names": ds["columns"]
}
return make_serializable({
"standing": "success",
"datasets": information,
"active_dataset": tool_context.state.get("active_dataset")
})
print("
Data loading instruments outlined!")
We outline the core information loading capabilities of the system. We create capabilities to load CSV recordsdata, generate pattern datasets for various use circumstances, and checklist the datasets at the moment accessible in reminiscence. We use this half to ensure our pipeline at all times has structured information prepared for downstream evaluation and agent interplay.
def describe_dataset(dataset_name: str, tool_context: ToolContext) -> dict:
print(f"
Describing dataset: {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
numeric_cols = df.select_dtypes(embody=[np.number]).columns.tolist()
categorical_cols = df.select_dtypes(embody=['object', 'category']).columns.tolist()
consequence = {
"standing": "success",
"dataset": dataset_name,
"overview": {
"total_rows": int(len(df)),
"total_columns": int(len(df.columns)),
"numeric_columns": numeric_cols,
"categorical_columns": categorical_cols,
"memory_mb": spherical(float(df.memory_usage(deep=True).sum() / 1024 / 1024), 2),
"duplicate_rows": int(df.duplicated().sum()),
"missing_total": int(df.isnull().sum().sum())
}
}
if numeric_cols:
stats_dict = {}
for col in numeric_cols:
col_data = df[col].dropna()
if len(col_data) > 0:
stats_dict[col] = {
"depend": int(len(col_data)),
"imply": spherical(float(col_data.imply()), 3),
"std": spherical(float(col_data.std()), 3),
"min": spherical(float(col_data.min()), 3),
"25%": spherical(float(col_data.quantile(0.25)), 3),
"50%": spherical(float(col_data.median()), 3),
"75%": spherical(float(col_data.quantile(0.75)), 3),
"max": spherical(float(col_data.max()), 3),
"skewness": spherical(float(col_data.skew()), 3),
"lacking": int(df[col].isnull().sum())
}
consequence["numeric_summary"] = stats_dict
if categorical_cols:
cat_dict = {}
for col in categorical_cols[:10]:
vc = df[col].value_counts()
cat_dict[col] = {
"unique_values": int(df[col].nunique()),
"top_values": {str(ok): int(v) for ok, v in vc.head(5).gadgets()},
"lacking": int(df[col].isnull().sum())
}
consequence["categorical_summary"] = cat_dict
DATA_STORE.log_analysis("describe", dataset_name, "Statistics generated")
return make_serializable(consequence)
def correlation_analysis(dataset_name: str, methodology: str = "pearson", tool_context: ToolContext = None) -> dict:
print(f"
Correlation evaluation: {dataset_name} ({methodology})")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
numeric_df = df.select_dtypes(embody=[np.number])
if numeric_df.form[1] < 2:
return {"standing": "error", "message": "Need at the least 2 numeric columns"}
corr_matrix = numeric_df.corr(methodology=methodology)
strong_corrs = []
for i in vary(len(corr_matrix.columns)):
for j in vary(i + 1, len(corr_matrix.columns)):
col1, col2 = corr_matrix.columns[i], corr_matrix.columns[j]
val = corr_matrix.iloc[i, j]
if abs(val) > 0.5:
strong_corrs.append({
"var1": col1,
"var2": col2,
"correlation": spherical(float(val), 3),
"energy": "robust" if abs(val) > 0.7 else "average"
})
strong_corrs.kind(key=lambda x: abs(x["correlation"]), reverse=True)
corr_dict = {}
for col in corr_matrix.columns:
corr_dict[col] = {ok: spherical(float(v), 3) for ok, v in corr_matrix[col].gadgets()}
DATA_STORE.log_analysis("correlation", dataset_name, f"{methodology} correlation")
return make_serializable({
"standing": "success",
"methodology": methodology,
"correlation_matrix": corr_dict,
"strong_correlations": strong_corrs[:10],
"perception": f"Found {len(strong_corrs)} pairs with |correlation| > 0.5"
})
def hypothesis_test(dataset_name: str, test_type: str, column1: str,
column2: str = None, group_column: str = None,
tool_context: ToolContext = None) -> dict:
print(f"
Hypothesis check: {test_type} on {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
if column1 not in df.columns:
return {"standing": "error", "message": f"Column '{column1}' not discovered"}
strive:
if test_type == "normality":
information = df[column1].dropna()
if len(information) > 5000:
information = information.pattern(5000)
stat, p = stats.shapiro(information)
return make_serializable({
"standing": "success",
"check": "Shapiro-Wilk Normality Test",
"column": column1,
"statistic": spherical(float(stat), 4),
"p_value": spherical(float(p), 6),
"is_normal": bool(p > 0.05),
"interpretation": "Data seems usually distributed" if p > 0.05 else "Data is NOT usually distributed"
})
elif test_type == "ttest":
if group_column is None:
return {"standing": "error", "message": "group_column required for t-test"}
teams = df[group_column].dropna().distinctive()
if len(teams) != 2:
return {"standing": "error", "message": f"T-test wants precisely 2 teams, discovered {len(teams)}: {checklist(teams)}"}
g1 = df[df[group_column] == teams[0]][column1].dropna()
g2 = df[df[group_column] == teams[1]][column1].dropna()
stat, p = stats.ttest_ind(g1, g2)
return make_serializable({
"standing": "success",
"check": "Independent Samples T-Test",
"evaluating": column1,
"group1": {"title": str(teams[0]), "imply": spherical(float(g1.imply()), 3), "n": int(len(g1))},
"group2": {"title": str(teams[1]), "imply": spherical(float(g2.imply()), 3), "n": int(len(g2))},
"t_statistic": spherical(float(stat), 4),
"p_value": spherical(float(p), 6),
"important": bool(p < 0.05),
"interpretation": "Significant distinction" if p < 0.05 else "No important distinction"
})
elif test_type == "anova":
if group_column is None:
return {"standing": "error", "message": "group_column required for ANOVA"}
groups_data = [grp[column1].dropna().values for _, grp in df.groupby(group_column)]
group_names = checklist(df[group_column].distinctive())
stat, p = stats.f_oneway(*groups_data)
group_stats = []
for title in group_names:
grp_data = df[df[group_column] == title][column1].dropna()
group_stats.append({
"group": str(title),
"imply": spherical(float(grp_data.imply()), 3),
"std": spherical(float(grp_data.std()), 3),
"n": int(len(grp_data))
})
return make_serializable({
"standing": "success",
"check": "One-Way ANOVA",
"evaluating": column1,
"throughout": group_column,
"n_groups": int(len(group_names)),
"group_statistics": group_stats,
"f_statistic": spherical(float(stat), 4),
"p_value": spherical(float(p), 6),
"important": bool(p < 0.05),
"interpretation": "Significant variations amongst teams" if p < 0.05 else "No important variations"
})
elif test_type == "chi2":
if column2 is None:
return {"standing": "error", "message": "column2 required for chi-square check"}
contingency = pd.crosstab(df[column1], df[column2])
chi2, p, dof, _ = stats.chi2_contingency(contingency)
return make_serializable({
"standing": "success",
"check": "Chi-Square Test of Independence",
"variables": [column1, column2],
"chi2_statistic": spherical(float(chi2), 4),
"p_value": spherical(float(p), 6),
"degrees_of_freedom": int(dof),
"important": bool(p < 0.05),
"interpretation": "Variables are dependent" if p < 0.05 else "Variables are impartial"
})
else:
return {"standing": "error", "message": f"Unknown check: {test_type}. Use: normality, ttest, anova, chi2"}
besides Exception as e:
return {"standing": "error", "message": f"Test failed: {str(e)}"}
def outlier_detection(dataset_name: str, column: str, methodology: str = "iqr",
tool_context: ToolContext = None) -> dict:
print(f"
Outlier detection: {column} in {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
if column not in df.columns:
return {"standing": "error", "message": f"Column '{column}' not discovered"}
information = df[column].dropna()
if methodology == "iqr":
Q1 = float(information.quantile(0.25))
Q3 = float(information.quantile(0.75))
IQR = Q3 - Q1
decrease = Q1 - 1.5 * IQR
higher = Q3 + 1.5 * IQR
outliers = information[(data < lower) | (data > upper)]
return make_serializable({
"standing": "success",
"methodology": "IQR (Interquartile Range)",
"column": column,
"bounds": {"decrease": spherical(decrease, 3), "higher": spherical(higher, 3)},
"iqr": spherical(IQR, 3),
"total_values": int(len(information)),
"outlier_count": int(len(outliers)),
"outlier_pct": spherical(float(len(outliers) / len(information) * 100), 2),
"outlier_examples": [round(float(x), 2) for x in outliers.head(10).tolist()]
})
elif methodology == "zscore":
z = np.abs(stats.zscore(information))
outliers = information[z > 3]
return make_serializable({
"standing": "success",
"methodology": "Z-Score (threshold: 3)",
"column": column,
"total_values": int(len(information)),
"outlier_count": int(len(outliers)),
"outlier_pct": spherical(float(len(outliers) / len(information) * 100), 2),
"outlier_examples": [round(float(x), 2) for x in outliers.head(10).tolist()]
})
return {"standing": "error", "message": f"Unknown methodology: {methodology}. Use: iqr, zscore"}
print("
Statistical evaluation instruments outlined!")
We construct the statistical evaluation layer of the tutorial. We create capabilities to explain datasets, calculate correlations, run speculation checks, and detect outliers utilizing customary analytical strategies. We use these instruments to show uncooked tabular information into significant statistical insights that the brokers can interpret and clarify.
def create_visualization(dataset_name: str, chart_type: str, x_column: str,
y_column: str = None, color_column: str = None,
title: str = None, tool_context: ToolContext = None) -> dict:
print(f"
Creating {chart_type}: {x_column}" + (f" vs {y_column}" if y_column else ""))
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
if x_column not in df.columns:
return {"standing": "error", "message": f"Column '{x_column}' not discovered"}
strive:
fig, ax = plt.subplots(figsize=(10, 6))
chart_title = title or f"{chart_type.title()}: {x_column}" + (f" vs {y_column}" if y_column else "")
if chart_type == "histogram":
if color_column and color_column in df.columns:
for grp in df[color_column].distinctive():
subset = df[df[color_column] == grp][x_column].dropna()
ax.hist(subset, alpha=0.6, label=str(grp), bins=30)
ax.legend()
else:
ax.hist(df[x_column].dropna(), bins=30, edgecolor='black', alpha=0.7, shade='steelblue')
ax.set_xlabel(x_column)
ax.set_ylabel("Frequency")
elif chart_type == "scatter":
if not y_column:
return {"standing": "error", "message": "y_column required for scatter"}
if color_column and color_column in df.columns:
for grp in df[color_column].distinctive():
subset = df[df[color_column] == grp]
ax.scatter(subset[x_column], subset[y_column], alpha=0.6, label=str(grp), s=50)
ax.legend()
else:
ax.scatter(df[x_column], df[y_column], alpha=0.6, s=50, shade='steelblue')
ax.set_xlabel(x_column)
ax.set_ylabel(y_column)
elif chart_type == "bar":
if y_column:
information = df.groupby(x_column)[y_column].sum().sort_values(ascending=False)
else:
information = df[x_column].value_counts()
colours = plt.cm.Blues(np.linspace(0.4, 0.8, len(information)))
bars = ax.bar(vary(len(information)), information.values, shade=colours)
ax.set_xticks(vary(len(information)))
ax.set_xticklabels([str(x) for x in data.index], rotation=45, ha='proper')
ax.set_xlabel(x_column)
ax.set_ylabel(y_column if y_column else "Count")
for bar, val in zip(bars, information.values):
ax.textual content(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01 * max(information.values),
f'{val:,.0f}', ha='middle', va='backside', fontsize=9)
elif chart_type == "line":
if not y_column:
return {"standing": "error", "message": "y_column required for line"}
df_sorted = df.sort_values(x_column)
if color_column and color_column in df.columns:
for grp in df_sorted[color_column].distinctive():
subset = df_sorted[df_sorted[color_column] == grp]
ax.plot(subset[x_column], subset[y_column], label=str(grp), marker='o', markersize=3)
ax.legend()
else:
ax.plot(df_sorted[x_column], df_sorted[y_column], marker='o', markersize=3, shade='steelblue')
ax.set_xlabel(x_column)
ax.set_ylabel(y_column)
plt.xticks(rotation=45)
elif chart_type == "field":
if color_column and color_column in df.columns:
teams = df[color_column].distinctive()
data_groups = [df[df[color_column] == g][x_column].dropna() for g in teams]
bp = ax.boxplot(data_groups, labels=[str(g) for g in groups], patch_artist=True)
colours = plt.cm.Blues(np.linspace(0.4, 0.8, len(teams)))
for patch, shade in zip(bp['boxes'], colours):
patch.set_facecolor(shade)
ax.set_xlabel(color_column)
else:
bp = ax.boxplot(df[x_column].dropna(), patch_artist=True)
bp['boxes'][0].set_facecolor('steelblue')
ax.set_ylabel(x_column)
elif chart_type == "heatmap":
numeric_df = df.select_dtypes(embody=[np.number])
corr = numeric_df.corr()
im = ax.imshow(corr, cmap='RdBu_r', side='auto', vmin=-1, vmax=1)
ax.set_xticks(vary(len(corr.columns)))
ax.set_yticks(vary(len(corr.columns)))
ax.set_xticklabels(corr.columns, rotation=45, ha='proper')
ax.set_yticklabels(corr.columns)
for i in vary(len(corr)):
for j in vary(len(corr)):
val = corr.iloc[i, j]
shade = 'white' if abs(val) > 0.5 else 'black'
ax.textual content(j, i, f'{val:.2f}', ha='middle', va='middle', shade=shade, fontsize=8)
plt.colorbar(im, ax=ax, label='Correlation')
elif chart_type == "pie":
information = df[x_column].value_counts()
colours = plt.cm.Blues(np.linspace(0.3, 0.9, len(information)))
wedges, texts, autotexts = ax.pie(information.values, labels=information.index, autopct='%1.1f%%',
colours=colours, startangle=90)
ax.axis('equal')
else:
return {"standing": "error", "message": f"Unknown chart: {chart_type}. Use: histogram, scatter, bar, line, field, heatmap, pie"}
ax.set_title(chart_title, fontsize=12, fontweight='daring')
plt.tight_layout()
plt.present()
plt.shut()
return make_serializable({
"standing": "success",
"chart_type": chart_type,
"title": chart_title,
"message": "Chart displayed efficiently"
})
besides Exception as e:
plt.shut()
return {"standing": "error", "message": f"Visualization failed: {str(e)}"}
def create_distribution_report(dataset_name: str, column: str, tool_context: ToolContext = None) -> dict:
print(f"
Distribution report: {column} in {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
if column not in df.columns:
return {"standing": "error", "message": f"Column '{column}' not discovered"}
information = df[column].dropna()
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
fig.suptitle(f'Distribution Analysis: {column}', fontsize=14, fontweight='daring')
axes[0, 0].hist(information, bins=30, density=True, alpha=0.7, shade='steelblue', edgecolor='black')
information.plot.kde(ax=axes[0, 0], shade='purple', linewidth=2)
axes[0, 0].set_title('Histogram with KDE')
axes[0, 0].set_xlabel(column)
axes[0, 0].set_ylabel('Density')
bp = axes[0, 1].boxplot(information, vert=True, patch_artist=True)
bp['boxes'][0].set_facecolor('steelblue')
axes[0, 1].set_title('Box Plot')
axes[0, 1].set_ylabel(column)
stats.probplot(information, dist="norm", plot=axes[1, 0])
axes[1, 0].set_title('Q-Q Plot (Normality)')
vp = axes[1, 1].violinplot(information, vert=True, showmeans=True, showmedians=True)
vp['bodies'][0].set_facecolor('steelblue')
axes[1, 1].set_title('Violin Plot')
axes[1, 1].set_ylabel(column)
plt.tight_layout()
plt.present()
plt.shut()
skew_val = float(information.skew())
form = "roughly symmetric" if abs(skew_val) < 0.5 else ("right-skewed" if skew_val > 0 else "left-skewed")
return make_serializable({
"standing": "success",
"column": column,
"statistics": {
"depend": int(len(information)),
"imply": spherical(float(information.imply()), 3),
"median": spherical(float(information.median()), 3),
"std": spherical(float(information.std()), 3),
"skewness": spherical(skew_val, 3),
"kurtosis": spherical(float(information.kurtosis()), 3),
"min": spherical(float(information.min()), 3),
"max": spherical(float(information.max()), 3)
},
"distribution_shape": form,
"message": "4 distribution plots displayed"
})
print("
Visualization instruments outlined!")
We create the visualization capabilities of the pipeline. We outline versatile charting capabilities for frequent plot sorts and additionally construct a richer distribution report that reveals a number of views of a variable without delay. We use this half to visually discover patterns, relationships, unfold, and potential points in the info.
def filter_data(dataset_name: str, situation: str, new_dataset_name: str, tool_context: ToolContext) -> dict:
print(f"
Filtering {dataset_name}: {situation}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
strive:
filtered = df.question(situation)
DATA_STORE.add_dataset(new_dataset_name, filtered, supply=f"filtered:{dataset_name}")
datasets = tool_context.state.get("loaded_datasets", [])
if new_dataset_name not in datasets:
datasets.append(new_dataset_name)
tool_context.state["loaded_datasets"] = datasets
return make_serializable({
"standing": "success",
"original_rows": int(len(df)),
"filtered_rows": int(len(filtered)),
"rows_removed": int(len(df) - len(filtered)),
"new_dataset": new_dataset_name
})
besides Exception as e:
return {"standing": "error", "message": f"Filter failed: {str(e)}"}
def aggregate_data(dataset_name: str, group_by: str, aggregations: str,
new_dataset_name: str, tool_context: ToolContext) -> dict:
print(f"
Aggregating {dataset_name} by {group_by}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
strive:
group_cols = [c.strip() for c in group_by.split(",")]
agg_dict = {}
for agg in aggregations.break up(","):
col, func = agg.strip().break up(":")
agg_dict[col.strip()] = func.strip()
result_df = df.groupby(group_cols).agg(agg_dict).reset_index()
new_cols = checklist(group_cols) + [f"{col}_{func}" for col, func in agg_dict.items()]
result_df.columns = new_cols
DATA_STORE.add_dataset(new_dataset_name, result_df, supply=f"aggregated:{dataset_name}")
datasets = tool_context.state.get("loaded_datasets", [])
if new_dataset_name not in datasets:
datasets.append(new_dataset_name)
tool_context.state["loaded_datasets"] = datasets
return make_serializable({
"standing": "success",
"grouped_by": group_cols,
"aggregations": agg_dict,
"result_rows": int(len(result_df)),
"columns": checklist(result_df.columns),
"new_dataset": new_dataset_name,
"preview": result_df.head(5).to_dict(orient="data")
})
besides Exception as e:
return {"standing": "error", "message": f"Aggregation failed: {str(e)}"}
def add_calculated_column(dataset_name: str, new_column: str, expression: str,
tool_context: ToolContext) -> dict:
print(f"
Adding column '{new_column}' to {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
strive:
df_copy = df.copy()
df_copy[new_column] = df_copy.eval(expression)
DATA_STORE.datasets[dataset_name]["data"] = df_copy
DATA_STORE.datasets[dataset_name]["columns"] = checklist(df_copy.columns)
sample_vals = df_copy[new_column].head(5)
return make_serializable({
"standing": "success",
"new_column": new_column,
"expression": expression,
"sample_values": [round(float(x), 3) if pd.notna(x) else None for x in sample_vals]
})
besides Exception as e:
return {"standing": "error", "message": f"Calculation failed: {str(e)}"}
print("
Transformation instruments outlined!")
def generate_summary_report(dataset_name: str, tool_context: ToolContext) -> dict:
print(f"
Generating report: {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"standing": "error", "message": f"Dataset '{dataset_name}' not discovered"}
numeric_cols = df.select_dtypes(embody=[np.number]).columns.tolist()
cat_cols = df.select_dtypes(embody=['object', 'category']).columns.tolist()
report = {
"dataset": dataset_name,
"generated_at": datetime.now().isoformat(),
"overview": {
"rows": int(len(df)),
"columns": int(len(df.columns)),
"numeric_cols": int(len(numeric_cols)),
"categorical_cols": int(len(cat_cols)),
"memory_mb": spherical(float(df.memory_usage(deep=True).sum() / 1024 / 1024), 2),
"duplicates": int(df.duplicated().sum()),
"complete_rows_pct": spherical(float((len(df) - df.isnull().any(axis=1).sum()) / len(df) * 100), 1)
},
"data_quality": {
"total_missing": int(df.isnull().sum().sum()),
"missing_pct": spherical(float(df.isnull().sum().sum() / (len(df) * len(df.columns)) * 100), 2),
"columns_with_missing": [col for col in df.columns if df[col].isnull().sum() > 0]
}
}
if numeric_cols:
insights = {}
for col in numeric_cols[:8]:
information = df[col].dropna()
if len(information) > 0:
insights[col] = {
"imply": spherical(float(information.imply()), 2),
"median": spherical(float(information.median()), 2),
"std": spherical(float(information.std()), 2),
"vary": [round(float(data.min()), 2), round(float(data.max()), 2)]
}
report["numeric_insights"] = insights
if cat_cols:
cat_insights = {}
for col in cat_cols[:5]:
cat_insights[col] = {
"distinctive": int(df[col].nunique()),
"top_3": {str(ok): int(v) for ok, v in df[col].value_counts().head(3).gadgets()}
}
report["categorical_insights"] = cat_insights
findings = []
if report["data_quality"]["missing_pct"] > 5:
findings.append(f"
{report['data_quality']['missing_pct']}% lacking information")
if report["overview"]["duplicates"] > 0:
findings.append(f"
{report['overview']['duplicates']} duplicate rows")
if not findings:
findings.append("
Data high quality appears to be like good")
report["key_findings"] = findings
return make_serializable({"standing": "success", "report": report})
def get_analysis_history(tool_context: ToolContext) -> dict:
historical past = DATA_STORE.analysis_history
if not historical past:
return {"standing": "information", "message": "No analyses carried out but"}
return make_serializable({"standing": "success", "depend": int(len(historical past)), "historical past": historical past[-15:]})
print("
Reporting instruments outlined!")
We concentrate on reworking information and producing structured experiences. We create capabilities to filter rows, combination values, add calculated columns, and summarize the dataset into a transparent report with high quality indicators and key findings. We use these steps to reshape the info and put together concise outputs that assist deeper evaluation and decision-making.
data_loader_agent = Agent(
title="data_loader",
mannequin=LiteLlm(mannequin=MODEL),
description="Loads CSV recordsdata, creates pattern datasets (gross sales, prospects, timeseries, survey)",
instruction="""You load information into the evaluation pipeline.
TOOLS:
- create_sample_dataset: Create check information. Types: 'gross sales', 'prospects', 'timeseries', 'survey'
- load_csv: Load from file path or URL
- list_available_datasets: Show what's loaded
Always use clear dataset names like 'sales_data', 'customer_analysis'.""",
instruments=[load_csv, create_sample_dataset, list_available_datasets]
)
stats_agent = Agent(
title="statistician",
mannequin=LiteLlm(mannequin=MODEL),
description="Statistical evaluation: descriptive stats, correlations, speculation checks, outliers",
instruction="""You carry out statistical evaluation.
TOOLS:
- describe_dataset: Full descriptive statistics
- correlation_analysis: Correlation matrix (pearson/spearman)
- hypothesis_test: Tests (normality, ttest, anova, chi2)
- outlier_detection: Find outliers (iqr/zscore)
Explain outcomes in plain language alongside statistics.""",
instruments=[describe_dataset, correlation_analysis, hypothesis_test, outlier_detection]
)
viz_agent = Agent(
title="visualizer",
mannequin=LiteLlm(mannequin=MODEL),
description="Creates charts: histogram, scatter, bar, line, field, heatmap, pie",
instruction="""You create visualizations.
TOOLS:
- create_visualization: Charts (histogram, scatter, bar, line, field, heatmap, pie)
- create_distribution_report: 4-plot distribution evaluation
GUIDE:
- Single variable distribution → histogram or field
- Two numeric variables → scatter
- Category comparability → bar
- Time tendencies → line
- Correlations overview → heatmap""",
instruments=[create_visualization, create_distribution_report]
)
transform_agent = Agent(
title="transformer",
mannequin=LiteLlm(mannequin=MODEL),
description="Data transformation: filter, combination, calculate columns",
instruction="""You rework information.
TOOLS:
- filter_data: Filter rows (e.g., situation='age > 30')
- aggregate_data: Group & combination (e.g., group_by='area', aggregations='income:sum,revenue:imply')
- add_calculated_column: New columns (e.g., expression='income * 0.1')
Always create new dataset names - do not overwrite originals.""",
instruments=[filter_data, aggregate_data, add_calculated_column]
)
report_agent = Agent(
title="reporter",
mannequin=LiteLlm(mannequin=MODEL),
description="Generates abstract experiences and tracks evaluation historical past",
instruction="""You create experiences.
TOOLS:
- generate_summary_report: Comprehensive dataset abstract
- get_analysis_history: View all analyses carried out""",
instruments=[generate_summary_report, get_analysis_history]
)
print("
Specialist brokers created!")
master_analyst = Agent(
title="data_analyst",
mannequin=LiteLlm(mannequin=MODEL),
description="Master Data Analyst orchestrating end-to-end information evaluation",
instruction="""You are an professional Data Analyst with a staff of specialists.
YOUR TEAM:
1. data_loader - Load/create datasets
2. statistician - Statistical evaluation
3. visualizer - Charts and plots
4. transformer - Data transformations
5. reporter - Reports and summaries
WORKFLOW:
1. Load information → 2. Describe → 3. Visualize → 4. Analyze → 5. Transform if wanted → 6. Report
Be useful, clarify insights clearly, recommend subsequent steps.""",
sub_agents=[data_loader_agent, stats_agent, viz_agent, transform_agent, report_agent]
)
print(f"
Master Analyst prepared with {len(master_analyst.sub_agents)} specialists!")
session_service = InMemorySessionService()
APP_NAME = "data_pipeline"
USER_ID = "analyst"
SESSION_ID = "session_001"
async def init():
return await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID,
state={"loaded_datasets": [], "active_dataset": None}
)
session = await init()
runner = Runner(agent=master_analyst, app_name=APP_NAME, session_service=session_service)
async def analyze(question: str):
print(f"n{'='*70}n
You: {question}n{'='*70}")
content material = sorts.Content(position='consumer', components=[types.Part(text=query)])
response = ""
strive:
async for occasion in runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content material):
if occasion.is_final_response() and occasion.content material and occasion.content material.components:
response = occasion.content material.components[0].textual content
break
besides Exception as e:
response = f"Error: {str(e)}"
print(f"n
Analyst: {response}n{'='*70}n")
print("
Ready! Use: await analyze('your query')")
print("="*70 + "n
DATA ANALYSIS DEMOn" + "="*70)
await analyze("Create a gross sales dataset for evaluation")
await analyze("Describe the sales_data - what columns and statistics do we've?")
await analyze("Create a histogram of income")
await analyze("Show a bar chart of complete income by area")
await analyze("What's the correlation between amount, unit_price, and income?")
await analyze("Is there a big distinction in income between buyer sorts? Run ANOVA.")
await analyze("Check for outliers in the income column")
await analyze("Create a heatmap of correlations")
await analyze("Generate a abstract report")
print("""
╔════════════════════════════════════════════════════════════════════╗
║
INTERACTIVE DATA ANALYSIS ║
╠════════════════════════════════════════════════════════════════════╣
║ Try these: ║
║ ║
║ await analyze("Create a prospects dataset") ║
║ await analyze("Show scatter plot of age vs revenue") ║
║ await analyze("Is revenue usually distributed?") ║
║ await analyze("Compare revenue between training ranges") ║
║ await analyze("Filter prospects the place age > 40") ║
║ await analyze("Calculate common lifetime_value by churn_risk") ║
╚════════════════════════════════════════════════════════════════════╝
""")
We assemble the total multi-agent system by creating specialist brokers and connecting them underneath one grasp analyst. We initialize the session, outline the async evaluation operate, and run a full demo that reveals how the pipeline behaves in apply. We use this last part to orchestrate your complete workflow end-to-end, from dataset creation to evaluation, visualization, and reporting.
In conclusion, we created an entire and interactive information evaluation framework that goes far past a primary pocket book workflow. We mixed information ingestion, descriptive analytics, speculation testing, outlier detection, chart era, transformation operations, and reporting into one unified agent-driven pipeline. As we check the system with pattern queries and evaluation requests, we demonstrated how Google ADK may help us design a modular, extensible, and production-ready analytics assistant that helps clearer insights, quicker iteration, and a extra clever method to information exploration.
Check out the Full Codes for Implementation here. Also, be happy to comply with us on Twitter and don’t overlook to hitch our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to accomplice with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us
The put up Google ADK Multi-Agent Pipeline Tutorial: Data Loading, Statistical Testing, Visualization, and Report Generation in Python appeared first on MarkTechPost.
