PyGraphistry Implementation Workflow for Interactive Graph Intelligence Pipelines in Security Analytics and Risk Investigation
In this tutorial, we construct a complicated, Colab-ready workflow round PyGraphistry for interactive graph analytics and visualization. We begin by creating a practical enterprise-style entry dataset, remodeling it into nodes and edges, and enriching the graph with danger scores, anomaly indicators, centrality metrics, group detection, and structure embeddings. We then use PyGraphistry to bind graph construction, visible encodings, labels, tooltips, and filtered subgraphs, and to generate native interactive visualizations when Graphistry credentials usually are not configured. Through this implementation, we see how graph intelligence helps us examine suspicious customers, dangerous gadgets, IP relationships, delicate providers, and high-risk behavioral patterns in a sensible safety analytics setting.




Star us on GitHub for future Code notebooks and implementation
Installing PyGraphistry and Dependencies
import os, sys, subprocess, warnings, textwrap, json, math, random
warnings.filterwarnings("ignore")
def pip_install(packages):
subprocess.run([sys.executable, "-m", "pip", "install", "-q", "-U", *packages], examine=True)
pip_install([
"graphistry[networkx,umap-learn]",
"pandas",
"numpy",
"networkx",
"scikit-learn",
"pyvis",
"matplotlib",
"pyarrow"
])
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import graphistry
from pathlib import Path
from IPython.show import show, HTML, IFrame
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.decomposition import PCA
from pyvis.community import Network
OUT_DIR = Path("/content material/pygraphistry_advanced_tutorial")
OUT_DIR.mkdir(dad and mom=True, exist_ok=True)
SEED = 42
rng = np.random.default_rng(SEED)
random.seed(SEED)
print("=" * 100)
print("PyGraphistry Advanced Colab Tutorial")
print("=" * 100)
print("This tutorial builds an enterprise-style entry graph, computes graph analytics,")
print("creates suspicious subgraphs, exports graph artifacts, and optionally uploads")
print("interactive visualizations to Graphistry Hub if credentials can be found.")
print("=" * 100)
def colab_secret(title, default=""):
worth = os.environ.get(title, default)
attempt:
from google.colab import userdata
secret_value = userdata.get(title)
if secret_value:
worth = secret_value
besides Exception:
cross
return worth or default
GRAPHISTRY_SERVER = colab_secret("GRAPHISTRY_SERVER", "hub.graphistry.com")
GRAPHISTRY_PROTOCOL = colab_secret("GRAPHISTRY_PROTOCOL", "https")
GRAPHISTRY_USERNAME = colab_secret("GRAPHISTRY_USERNAME", "")
GRAPHISTRY_PASSWORD = colab_secret("GRAPHISTRY_PASSWORD", "")
GRAPHISTRY_PERSONAL_KEY_ID = colab_secret("GRAPHISTRY_PERSONAL_KEY_ID", "")
GRAPHISTRY_PERSONAL_KEY_SECRET = colab_secret("GRAPHISTRY_PERSONAL_KEY_SECRET", "")
REGISTERED = False
attempt:
if GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
personal_key_id=GRAPHISTRY_PERSONAL_KEY_ID,
personal_key_secret=GRAPHISTRY_PERSONAL_KEY_SECRET
)
REGISTERED = True
print("Graphistry registered with private key credentials.")
elif GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD:
graphistry.register(
api=3,
protocol=GRAPHISTRY_PROTOCOL,
server=GRAPHISTRY_SERVER,
username=GRAPHISTRY_USERNAME,
password=GRAPHISTRY_PASSWORD
)
REGISTERED = True
print("Graphistry registered with username/password credentials.")
else:
graphistry.register(api=3, protocol=GRAPHISTRY_PROTOCOL, server=GRAPHISTRY_SERVER)
print("No Graphistry credentials discovered. Local analytics will run; Graphistry .plot() uploads can be skipped.")
print("To allow stay Graphistry plots, add Colab secrets and techniques:")
print("GRAPHISTRY_PERSONAL_KEY_ID and GRAPHISTRY_PERSONAL_KEY_SECRET")
print("or GRAPHISTRY_USERNAME and GRAPHISTRY_PASSWORD")
besides Exception as e:
REGISTERED = False
print("Graphistry registration was not accomplished:", repr(e))
print("Continuing with native analytics and native HTML visualization.")
def nid(variety, worth):
return f"{variety}:{worth}"
We arrange the entire Colab atmosphere by putting in PyGraphistry and all supporting libraries for graph analytics, visualization, and machine studying. We configure the output listing, random seed, and Graphistry credentials so the pocket book works each regionally and with Graphistry Hub. We additionally outline a reusable helper for node naming to maintain each entity sort clearly separated in the graph.
Generating Enterprise Access Dataset
n_users = 55
n_devices = 42
n_ips = 36
n_services = 15
n_roles = 7
n_geos = 10
n_events = 2200
customers = [f"user_{i:03d}" for i in range(n_users)]
gadgets = [f"device_{i:03d}" for i in range(n_devices)]
ips = [f"10.{i // 255}.{i % 255}.{rng.integers(1, 255)}" for i in range(1, n_ips + 1)]
providers = [
"salesforce", "snowflake", "github", "jira", "slack",
"vpn", "okta", "aws_console", "gcp_console", "databricks",
"hris", "email", "crm", "vault", "payments_api"
]
roles = ["employee", "analyst", "engineer", "manager", "admin", "contractor", "service_account"]
geos = ["IN", "US", "GB", "DE", "SG", "AE", "BR", "NL", "AU", "JP"]
privileged_users = set(rng.selection(customers, dimension=7, exchange=False))
compromised_users = set(rng.selection(record(set(customers) - privileged_users), dimension=4, exchange=False))
risky_devices = set(rng.selection(gadgets, dimension=5, exchange=False))
risky_ips = set(rng.selection(ips, dimension=5, exchange=False))
sensitive_services = {"aws_console", "gcp_console", "vault", "payments_api", "snowflake"}
user_role = {}
for u in customers:
if u in privileged_users:
user_role[u] = rng.selection(["admin", "manager", "engineer"], p=[0.55, 0.2, 0.25])
elif rng.random() < 0.08:
user_role[u] = "contractor"
else:
user_role[u] = rng.selection(["employee", "analyst", "engineer"], p=[0.45, 0.25, 0.30])
user_home_geo = {u: rng.selection(geos, p=[0.30, 0.22, 0.10, 0.08, 0.08, 0.05, 0.04, 0.04, 0.04, 0.05]) for u in customers}
device_owner = {d: rng.selection(customers) for d in gadgets}
base_time = pd.Timestamp("2026-06-01 00:00:00")
occasions = []
for i in vary(n_events):
if rng.random() < 0.18:
consumer = rng.selection(record(compromised_users))
else:
consumer = rng.selection(customers)
if consumer in compromised_users and rng.random() < 0.42:
system = rng.selection(record(risky_devices))
else:
owned = [d for d, owner in device_owner.items() if owner == user]
system = rng.selection(owned if owned and rng.random() < 0.78 else gadgets)
if consumer in compromised_users and rng.random() < 0.50:
ip = rng.selection(record(risky_ips))
else:
ip = rng.selection(ips)
if consumer in compromised_users and rng.random() < 0.45:
service = rng.selection(record(sensitive_services))
else:
service = rng.selection(providers)
position = user_role[user]
home_geo = user_home_geo[user]
geo = home_geo if rng.random() < 0.88 else rng.selection([g for g in geos if g != home_geo])
hour = int(rng.integers(0, 24))
minute = int(rng.integers(0, 60))
timestamp = base_time + pd.Timedelta(days=int(rng.integers(0, 10)), hours=hour, minutes=minute)
impossible_travel = int(geo != home_geo and rng.random() < 0.65)
off_hours = int(hour < 6 or hour > 21)
service_sensitivity = 1.0 if service in sensitive_services else 0.25
privileged = int(position in ["admin", "manager", "service_account"])
compromised = int(consumer in compromised_users)
risky_infra = int(system in risky_devices or ip in risky_ips)
risk_score = (
0.08
+ 0.22 * compromised
+ 0.18 * risky_infra
+ 0.17 * impossible_travel
+ 0.13 * off_hours
+ 0.15 * service_sensitivity
+ 0.07 * privileged
+ rng.regular(0, 0.06)
)
risk_score = float(np.clip(risk_score, 0.0, 1.0))
success_probability = 0.96 - 0.45 * risk_score
is_success = bool(rng.random() < success_probability)
quantity = float(np.spherical(np.exp(rng.regular(7.0 + 1.4 * service_sensitivity, 0.8)), 2))
if service not in {"payments_api", "vault", "snowflake"}:
quantity = float(np.spherical(quantity * rng.uniform(0.01, 0.10), 2))
occasions.append({
"event_id": f"evt_{i:05d}",
"timestamp": timestamp,
"consumer": consumer,
"system": system,
"ip": ip,
"service": service,
"position": position,
"geo": geo,
"home_geo": home_geo,
"is_success": is_success,
"off_hours": bool(off_hours),
"impossible_travel": bool(impossible_travel),
"risk_score": risk_score,
"quantity": quantity,
"is_seeded_compromise": bool(consumer in compromised_users),
"is_privileged_user": bool(consumer in privileged_users),
"is_risky_device": bool(system in risky_devices),
"is_risky_ip": bool(ip in risky_ips)
})
events_df = pd.DataFrame(occasions)
events_df["timestamp"] = pd.to_datetime(events_df["timestamp"])
print("nRaw occasion pattern:")
show(events_df.head(10))
print("nSeeded high-risk entities used for validation:")
print("Compromised customers:", sorted(compromised_users))
print("Risky gadgets:", sorted(risky_devices))
print("Risky IPs:", sorted(risky_ips))
We generate a practical artificial enterprise entry dataset with customers, gadgets, IPs, providers, roles, and geographic areas. We simulate regular and suspicious habits by including compromised customers, dangerous gadgets, dangerous IPs, off-hours exercise, inconceivable journey, and delicate service entry. We convert these occasions right into a structured DataFrame that serves as the muse for our graph-based safety investigation.
Building Graph Edges Table
edge_rows = []
for row in events_df.itertuples(index=False):
user_node = nid("consumer", row.consumer)
device_node = nid("system", row.system)
ip_node = nid("ip", row.ip)
service_node = nid("service", row.service)
role_node = nid("position", row.position)
geo_node = nid("geo", row.geo)
home_geo_node = nid("geo", row.home_geo)
event_node = nid("occasion", row.event_id)
base = {
"event_id": row.event_id,
"timestamp": row.timestamp,
"risk_score": row.risk_score,
"quantity": row.quantity,
"is_success": row.is_success,
"off_hours": row.off_hours,
"impossible_travel": row.impossible_travel,
"is_seeded_compromise": row.is_seeded_compromise
}
edge_rows.prolong([
{**base, "src": user_node, "dst": device_node, "src_type": "user", "dst_type": "device", "relation": "USES_DEVICE"},
{**base, "src": user_node, "dst": service_node, "src_type": "user", "dst_type": "service", "relation": "ACCESSES_SERVICE"},
{**base, "src": device_node, "dst": ip_node, "src_type": "device", "dst_type": "ip", "relation": "CONNECTS_FROM_IP"},
{**base, "src": ip_node, "dst": geo_node, "src_type": "ip", "dst_type": "geo", "relation": "RESOLVES_TO_GEO"},
{**base, "src": user_node, "dst": role_node, "src_type": "user", "dst_type": "role", "relation": "HAS_ROLE"},
{**base, "src": user_node, "dst": home_geo_node, "src_type": "user", "dst_type": "geo", "relation": "HOME_GEO"}
])
raw_edges_df = pd.DataFrame(edge_rows)
edges_df = (
raw_edges_df
.groupby(["src", "dst", "relation", "src_type", "dst_type"], as_index=False)
.agg(
event_count=("event_id", "nunique"),
first_seen=("timestamp", "min"),
last_seen=("timestamp", "max"),
max_risk=("risk_score", "max"),
avg_risk=("risk_score", "imply"),
failed_count=("is_success", lambda s: int((~s).sum())),
off_hours_count=("off_hours", "sum"),
impossible_travel_count=("impossible_travel", "sum"),
amount_sum=("quantity", "sum"),
seeded_compromise_count=("is_seeded_compromise", "sum")
)
)
edges_df["edge_id"] = [f"edge_{i:05d}" for i in range(len(edges_df))]
edges_df["edge_label"] = edges_df["relation"] + " | n=" + edges_df["event_count"].astype(str)
edges_df["edge_size"] = np.clip(np.log1p(edges_df["event_count"]) * 2.5, 1, 20)
edges_df["edge_title"] = edges_df.apply(
lambda r: (
f"<b>{r['relation']}</b><br>"
f"{r['src']} → {r['dst']}<br>"
f"occasions: {int(r['event_count'])}<br>"
f"max danger: {r['max_risk']:.3f}<br>"
f"avg danger: {r['avg_risk']:.3f}<br>"
f"failures: {int(r['failed_count'])}<br>"
f"off-hours: {int(r['off_hours_count'])}<br>"
f"impossible-travel: {int(r['impossible_travel_count'])}<br>"
f"quantity sum: {r['amount_sum']:.2f}"
),
axis=1
)
edges_df["first_seen"] = edges_df["first_seen"].astype(str)
edges_df["last_seen"] = edges_df["last_seen"].astype(str)
all_node_ids = sorted(set(edges_df["src"]).union(set(edges_df["dst"])))
nodes_df = pd.DataFrame({"id": all_node_ids})
nodes_df["entity_type"] = nodes_df["id"].str.cut up(":", n=1).str[0]
nodes_df["label"] = nodes_df["id"].str.cut up(":", n=1).str[1]
touch_src = raw_edges_df[["src", "event_id", "risk_score", "amount", "is_success", "off_hours", "impossible_travel"]].rename(columns={"src": "id"})
touch_dst = raw_edges_df[["dst", "event_id", "risk_score", "amount", "is_success", "off_hours", "impossible_travel"]].rename(columns={"dst": "id"})
touches = pd.concat([touch_src, touch_dst], ignore_index=True)
node_stats = (
touches
.groupby("id", as_index=False)
.agg(
touched_events=("event_id", "nunique"),
max_risk=("risk_score", "max"),
avg_risk=("risk_score", "imply"),
failed_touches=("is_success", lambda s: int((~s).sum())),
off_hours_touches=("off_hours", "sum"),
impossible_travel_touches=("impossible_travel", "sum"),
amount_touched=("quantity", "sum")
)
)
nodes_df = nodes_df.merge(node_stats, on="id", how="left").fillna({
"touched_events": 0,
"max_risk": 0.0,
"avg_risk": 0.0,
"failed_touches": 0,
"off_hours_touches": 0,
"impossible_travel_touches": 0,
"amount_touched": 0.0
})
We remodel uncooked occasion information into graph relationships by creating edges between customers, gadgets, IPs, providers, roles, and geographies. We combination repeated interactions into weighted edges with danger scores, counts, failures, timestamps, and exercise summaries. We additionally create the node desk and compute primary node-level statistics from all entity interactions.
Computing Graph Analytics Features
G = nx.DiGraph()
for row in nodes_df.itertuples(index=False):
G.add_node(row.id, entity_type=row.entity_type, label=row.label)
for row in edges_df.itertuples(index=False):
G.add_edge(
row.src,
row.dst,
relation=row.relation,
event_count=float(row.event_count),
max_risk=float(row.max_risk),
avg_risk=float(row.avg_risk),
failed_count=float(row.failed_count),
amount_sum=float(row.amount_sum)
)
degree_w = dict(G.diploma(weight="event_count"))
in_degree_w = dict(G.in_degree(weight="event_count"))
out_degree_w = dict(G.out_degree(weight="event_count"))
attempt:
pagerank = nx.pagerank(G, weight="event_count", max_iter=250)
besides Exception:
pagerank = {n: 0.0 for n in G.nodes()}
attempt:
betweenness = nx.betweenness_centrality(G, okay=min(90, max(2, G.number_of_nodes())), seed=SEED)
besides Exception:
betweenness = {n: 0.0 for n in G.nodes()}
UG = G.to_undirected()
attempt:
communities = record(nx.group.greedy_modularity_communities(UG, weight="event_count"))
besides Exception:
communities = [set(c) for c in nx.connected_components(UG)]
community_map = {}
for cid, members in enumerate(communities):
for n in members:
community_map[n] = cid
nodes_df["degree_w"] = nodes_df["id"].map(degree_w).fillna(0.0)
nodes_df["in_degree_w"] = nodes_df["id"].map(in_degree_w).fillna(0.0)
nodes_df["out_degree_w"] = nodes_df["id"].map(out_degree_w).fillna(0.0)
nodes_df["pagerank"] = nodes_df["id"].map(pagerank).fillna(0.0)
nodes_df["betweenness"] = nodes_df["id"].map(betweenness).fillna(0.0)
nodes_df["community"] = nodes_df["id"].map(community_map).fillna(-1).astype(int)
risk_bins = [-0.001, 0.35, 0.65, 0.85, 1.001]
risk_labels = ["low", "medium", "high", "critical"]
nodes_df["risk_band"] = pd.minimize(nodes_df["max_risk"], bins=risk_bins, labels=risk_labels).astype(str)
feature_cols = [
"touched_events",
"max_risk",
"avg_risk",
"failed_touches",
"off_hours_touches",
"impossible_travel_touches",
"amount_touched",
"degree_w",
"in_degree_w",
"out_degree_w",
"pagerank",
"betweenness"
]
X_num = nodes_df[feature_cols].exchange([np.inf, -np.inf], 0).fillna(0.0)
X_scaled = StandardScaler().fit_transform(X_num)
iso = IsolationForest(
n_estimators=250,
contamination=0.10,
random_state=SEED
)
iso.match(X_scaled)
nodes_df["anomaly_score"] = -iso.score_samples(X_scaled)
nodes_df["is_anomaly"] = iso.predict(X_scaled) == -1
type_color_map = {
"consumer": "#1f77b4",
"system": "#ff7f0e",
"ip": "#2ca02c",
"service": "#9467bd",
"position": "#8c564b",
"geo": "#17becf",
"occasion": "#7f7f7f"
}
nodes_df["node_color"] = nodes_df["entity_type"].map(type_color_map).fillna("#999999")
nodes_df.loc[nodes_df["risk_band"].eq("vital"), "node_color"] = "#d62728"
nodes_df.loc[nodes_df["is_anomaly"], "node_color"] = "#000000"
size_raw = (
8
+ 6 * np.log1p(nodes_df["degree_w"].astype(float))
+ 10 * nodes_df["pagerank"].astype(float) / max(nodes_df["pagerank"].max(), 1e-9)
+ 8 * nodes_df["is_anomaly"].astype(int)
)
nodes_df["node_size"] = np.clip(size_raw, 5, 60)
model_features = pd.concat([
nodes_df[feature_cols + ["anomaly_score"]].exchange([np.inf, -np.inf], 0).fillna(0.0),
pd.get_dummies(nodes_df[["entity_type", "risk_band"]], dtype=float)
], axis=1)
attempt:
import umap
reducer = umap.UMAP(
n_components=2,
n_neighbors=min(18, max(2, len(nodes_df) - 1)),
min_dist=0.08,
metric="euclidean",
random_state=SEED
)
emb = reducer.fit_transform(StandardScaler().fit_transform(model_features))
layout_name = "UMAP"
besides Exception:
reducer = PCA(n_components=2, random_state=SEED)
emb = reducer.fit_transform(StandardScaler().fit_transform(model_features))
layout_name = "PCA fallback"
nodes_df["x"] = emb[:, 0].astype(float)
nodes_df["y"] = emb[:, 1].astype(float)
nodes_df["point_title"] = nodes_df.apply(
lambda r: (
f"<b>{r['id']}</b><br>"
f"sort: {r['entity_type']}<br>"
f"group: {int(r['community'])}<br>"
f"danger band: {r['risk_band']}<br>"
f"max danger: {r['max_risk']:.3f}<br>"
f"avg danger: {r['avg_risk']:.3f}<br>"
f"weighted diploma: {r['degree_w']:.1f}<br>"
f"pagerank: {r['pagerank']:.6f}<br>"
f"betweenness: {r['betweenness']:.6f}<br>"
f"anomaly rating: {r['anomaly_score']:.4f}<br>"
f"is anomaly: {bool(r['is_anomaly'])}"
),
axis=1
)
print("nGraph abstract:")
print(f"Events: {len(events_df):,}")
print(f"Raw relationship rows: {len(raw_edges_df):,}")
print(f"Aggregated edges: {len(edges_df):,}")
print(f"Nodes: {len(nodes_df):,}")
print(f"Communities: {len(communities):,}")
print(f"External structure: {layout_name}")
print("nNode sort counts:")
show(nodes_df["entity_type"].value_counts().rename_axis("entity_type").reset_index(title="depend"))
print("nRisk band counts:")
show(nodes_df["risk_band"].value_counts().rename_axis("risk_band").reset_index(title="depend"))
print("nTop 20 anomalous nodes:")
top_anomalies = (
nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False])
[["id", "entity_type", "risk_band", "is_anomaly", "anomaly_score", "max_risk", "avg_risk", "degree_w", "pagerank", "community"]]
.head(20)
)
show(top_anomalies)
print("nTop 20 dangerous relationships:")
top_edges = (
edges_df
.sort_values(["max_risk", "failed_count", "event_count"], ascending=[False, False, False])
[["src", "dst", "relation", "event_count", "max_risk", "avg_risk", "failed_count", "off_hours_count", "impossible_travel_count", "amount_sum"]]
.head(20)
)
show(top_edges)
fig = plt.determine(figsize=(9, 5))
plt.hist(nodes_df["anomaly_score"], bins=30)
plt.title("Node Anomaly Score Distribution")
plt.xlabel("Anomaly rating")
plt.ylabel("Node depend")
plt.present()
fig = plt.determine(figsize=(9, 5))
nodes_df.groupby("entity_type")["max_risk"].imply().sort_values().plot(variety="bar")
plt.title("Mean Max Risk by Entity Type")
plt.xlabel("Entity sort")
plt.ylabel("Mean max danger")
plt.xticks(rotation=45, ha="proper")
plt.present()
We construct a NetworkX graph from the generated nodes and edges and compute superior graph analytics. We calculate weighted diploma, PageRank, betweenness centrality, communities, danger bands, anomaly scores, and machine-learning-based structure embeddings. We then examine graph summaries, high anomalous nodes, dangerous relationships, and danger distributions by way of tables and plots.
Building Interactive Graph Visualizations
base_g = (
graphistry
.bind(supply="src", vacation spot="dst", node="id")
.edges(edges_df)
.nodes(nodes_df)
.bind(
edge="edge_id",
edge_title="edge_title",
edge_label="edge_label",
edge_weight="event_count",
edge_size="edge_size",
point_title="point_title",
point_label="label",
point_color="node_color",
point_size="node_size",
point_x="x",
point_y="y"
)
.settings(url_params={"play": 0, "data": "true"})
)
print("nConstructed a PyGraphistry Plotter named base_g.")
print("It binds src/dst edges, node attributes, titles, labels, sizes, colours, and exterior x/y structure.")
attempt:
dot_text = base_g.plot_static(engine="graphviz-dot", reuse_layout=True)
dot_path = OUT_DIR / "graph_static.dot"
with open(dot_path, "w") as f:
f.write(dot_text if isinstance(dot_text, str) else str(dot_text))
print("Saved DOT illustration:", dot_path)
besides Exception as e:
print("Static DOT export skipped:", repr(e))
def show_pyvis(nodes, edges, output_path, peak="780px"):
nodes_small = nodes.copy()
edges_small = edges.copy()
max_nodes = 320
if len(nodes_small) > max_nodes:
preserve = set(
nodes_small
.sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False])
.head(max_nodes)["id"]
)
nodes_small = nodes_small[nodes_small["id"].isin(preserve)]
edges_small = edges_small[edges_small["src"].isin(preserve) & edges_small["dst"].isin(preserve)]
internet = Network(
peak=peak,
width="100%",
directed=True,
pocket book=True,
cdn_resources="in_line"
)
internet.barnes_hut(gravity=-25000, central_gravity=0.2, spring_length=160, spring_strength=0.04, damping=0.92)
for row in nodes_small.itertuples(index=False):
title = str(row.point_title).exchange("<br>", "n").exchange("<b>", "").exchange("</b>", "")
internet.add_node(
row.id,
label=str(row.label),
title=title,
group=str(row.entity_type),
worth=float(row.node_size)
)
for row in edges_small.itertuples(index=False):
title = str(row.edge_title).exchange("<br>", "n").exchange("<b>", "").exchange("</b>", "")
internet.add_edge(
row.src,
row.dst,
title=title,
label=str(row.relation) if row.max_risk >= 0.90 else "",
worth=float(max(1.0, row.edge_size))
)
internet.write_html(str(output_path), pocket book=False)
show(HTML(filename=str(output_path)))
print("Saved native interactive HTML:", output_path)
local_full_html = OUT_DIR / "local_full_graph.html"
show_pyvis(nodes_df, edges_df, local_full_html)
seed_node = (
nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max_risk", "pagerank"], ascending=[False, False, False, False])
.iloc[0]["id"]
)
ego = nx.ego_graph(G.to_undirected(), seed_node, radius=2)
ego_nodes = set(ego.nodes())
ego_edges_df = edges_df[edges_df["src"].isin(ego_nodes) & edges_df["dst"].isin(ego_nodes)].copy()
ego_nodes_df = nodes_df[nodes_df["id"].isin(ego_nodes)].copy()
print("nFocused investigation seed node:", seed_node)
print(f"Ego subgraph nodes: {len(ego_nodes_df):,}")
print(f"Ego subgraph edges: {len(ego_edges_df):,}")
show(
ego_nodes_df
.sort_values(["is_anomaly", "anomaly_score", "max_risk"], ascending=[False, False, False])
[["id", "entity_type", "risk_band", "is_anomaly", "anomaly_score", "max_risk", "degree_w", "pagerank", "community"]]
.head(30)
)
ego_g = (
graphistry
.bind(supply="src", vacation spot="dst", node="id")
.edges(ego_edges_df)
.nodes(ego_nodes_df)
.bind(
edge="edge_id",
edge_title="edge_title",
edge_label="edge_label",
edge_weight="event_count",
edge_size="edge_size",
point_title="point_title",
point_label="label",
point_color="node_color",
point_size="node_size",
point_x="x",
point_y="y"
)
.settings(url_params={"play": 0, "data": "true"})
)
local_ego_html = OUT_DIR / "local_ego_investigation_graph.html"
show_pyvis(ego_nodes_df, ego_edges_df, local_ego_html)
risky_edges_df = edges_df[
(edges_df["max_risk"] >= 0.85)
| (edges_df["failed_count"] >= edges_df["failed_count"].quantile(0.95))
| (edges_df["impossible_travel_count"] > 0)
].copy()
risky_node_ids = set(risky_edges_df["src"]).union(set(risky_edges_df["dst"]))
risky_nodes_df = nodes_df[nodes_df["id"].isin(risky_node_ids)].copy()
risky_g = (
graphistry
.bind(supply="src", vacation spot="dst", node="id")
.edges(risky_edges_df)
.nodes(risky_nodes_df)
.bind(
edge="edge_id",
edge_title="edge_title",
edge_label="edge_label",
edge_weight="event_count",
edge_size="edge_size",
point_title="point_title",
point_label="label",
point_color="node_color",
point_size="node_size",
point_x="x",
point_y="y"
)
.settings(url_params={"play": 0, "data": "true"})
)
print("nHigh-risk filtered graph:")
print(f"Risky nodes: {len(risky_nodes_df):,}")
print(f"Risky edges: {len(risky_edges_df):,}")
local_risky_html = OUT_DIR / "local_high_risk_graph.html"
show_pyvis(risky_nodes_df, risky_edges_df, local_risky_html)
We create PyGraphistry plot objects by binding supply and vacation spot nodes, node IDs, labels, colours, sizes, tooltips, and structure coordinates. We additionally generate native PyVis HTML visualizations so we are able to examine the complete graph, a targeted ego investigation graph, and a high-risk filtered graph with out requiring Graphistry credentials. We use these views to transition from broad graph exploration to focused investigation of suspicious entities.
Exporting Hypergraphs and Artifacts
attempt:
hypergraph_input = events_df[[
"event_id", "user", "device", "ip", "service", "role", "geo",
"risk_score", "amount", "is_success", "off_hours", "impossible_travel"
]].head(450).copy()
hg = graphistry.hypergraph(
hypergraph_input,
["user", "device", "ip", "service", "role", "geo"]
)
hyper_g = hg["graph"]
print("nConstructed a PyGraphistry hypergraph from uncooked occasion rows.")
print("Hypergraph keys:", record(hg.keys()))
besides Exception as e:
hyper_g = None
print("nHypergraph remodel skipped:", repr(e))
if REGISTERED:
print("nUploading interactive visualizations to Graphistry...")
attempt:
full_url = base_g.plot(render=False)
print("Full graph URL:", full_url)
show(IFrame(full_url, width="100%", peak=780))
besides Exception as e:
print("Full Graphistry add failed:", repr(e))
attempt:
ego_url = ego_g.plot(render=False)
print("Ego investigation graph URL:", ego_url)
show(IFrame(ego_url, width="100%", peak=780))
besides Exception as e:
print("Ego Graphistry add failed:", repr(e))
attempt:
risky_url = risky_g.plot(render=False)
print("High-risk graph URL:", risky_url)
show(IFrame(risky_url, width="100%", peak=780))
besides Exception as e:
print("Risky Graphistry add failed:", repr(e))
if hyper_g just isn't None:
attempt:
hyper_url = hyper_g.plot(render=False)
print("Hypergraph URL:", hyper_url)
show(IFrame(hyper_url, width="100%", peak=780))
besides Exception as e:
print("Hypergraph Graphistry add failed:", repr(e))
else:
print("nGraphistry add skipped as a result of credentials usually are not configured.")
print("Local HTML visualizations had been nonetheless generated and displayed.")
events_path = OUT_DIR / "occasions.csv"
raw_edges_path = OUT_DIR / "raw_edges.parquet"
edges_path = OUT_DIR / "aggregated_edges.parquet"
nodes_path = OUT_DIR / "nodes.parquet"
summary_path = OUT_DIR / "investigation_summary.json"
gexf_path = OUT_DIR / "enterprise_access_graph.gexf"
events_df.to_csv(events_path, index=False)
raw_edges_df.to_parquet(raw_edges_path, index=False)
edges_df.to_parquet(edges_path, index=False)
nodes_df.to_parquet(nodes_path, index=False)
nx.write_gexf(G, gexf_path)
abstract = {
"occasions": int(len(events_df)),
"raw_edges": int(len(raw_edges_df)),
"aggregated_edges": int(len(edges_df)),
"nodes": int(len(nodes_df)),
"communities": int(len(communities)),
"structure": layout_name,
"seed_node_for_ego_investigation": seed_node,
"compromised_users": sorted(compromised_users),
"risky_devices": sorted(risky_devices),
"risky_ips": sorted(risky_ips),
"top_anomalies": top_anomalies.to_dict(orient="information"),
"top_risky_edges": top_edges.to_dict(orient="information"),
"outputs": {
"events_csv": str(events_path),
"raw_edges_parquet": str(raw_edges_path),
"aggregated_edges_parquet": str(edges_path),
"nodes_parquet": str(nodes_path),
"gexf": str(gexf_path),
"local_full_graph_html": str(local_full_html),
"local_ego_graph_html": str(local_ego_html),
"local_high_risk_graph_html": str(local_risky_html)
}
}
with open(summary_path, "w") as f:
json.dump(abstract, f, indent=2)
print("nSaved tutorial artifacts:")
for okay, v in abstract["outputs"].objects():
print(f"{okay}: {v}")
print("summary_json:", summary_path)
print("nNotebook subsequent steps:")
print("1. Open the native HTML graphs above to examine communities, anomalies, dangerous IPs, and suspicious user-service paths.")
print("2. Add Graphistry credentials as Colab secrets and techniques to allow GPU-backed Graphistry Hub uploads.")
print("3. Replace the artificial events_df with your individual entry logs, transactions, safety alerts, or entity relationship desk.")
print("4. Keep the identical edges_df/nodes_df schema to reuse the analytics and visualization pipeline.")
We create a PyGraphistry hypergraph from uncooked occasion rows, illustrating one other technique to convert tabular information into graph kind. We optionally add the complete graph, ego graph, dangerous graph, and hypergraph to Graphistry Hub when credentials can be found. We lastly export all necessary artifacts, together with CSV, Parquet, GEXF, HTML, and JSON information, in order that we are able to reuse the outcomes for additional evaluation.
Conclusion
In conclusion, we accomplished an end-to-end PyGraphistry pipeline that transforms uncooked event-style information into a completely enriched, analyzable graph. We constructed significant relationships, computed graph options, recognized anomalous entities, created targeted investigation subgraphs, and exported reusable artifacts for additional evaluation. We additionally made the workflow versatile by supporting each native HTML visualization and non-compulsory Graphistry Hub uploads in order that we are able to run it simply in Google Colab with or with out credentials. At final, we’ve got a robust basis for making use of PyGraphistry to real-world use instances reminiscent of fraud detection, cybersecurity investigation, entry monitoring, entity decision, and graph-based danger intelligence.
Check out the Full Codes here. Also, be happy to comply with us on Twitter and don’t neglect to hitch our 150k+ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to companion with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so forth.? Connect with us
The put up PyGraphistry Implementation Workflow for Interactive Graph Intelligence Pipelines in Security Analytics and Risk Investigation appeared first on MarkTechPost.
