How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection
In this tutorial, we construct a lifelike Zero-Trust community simulation by modeling a micro-segmented setting as a directed graph and forcing each request to earn entry by way of steady verification. We implement a dynamic coverage engine that blends ABAC-style permissions with gadget posture, MFA, path reachability, zone sensitivity, and reside danger alerts similar to anomaly and data-volume indicators. We then operationalize the mannequin by way of a Flask API and run combined site visitors, together with insider-lateral motion and exfiltration makes an attempt, to present how belief scoring, adaptive controls, and automated quarantines block malicious flows in actual time.
!pip -q set up networkx flask
import math
import json
import time
import random
import hashlib
from dataclasses import dataclass, area
from typing import Dict, Any, List, Tuple, Optional
import networkx as nx
from flask import Flask, request, jsonify
import matplotlib.pyplot as plt
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
def _clamp(x: float, lo: float = 0.0, hello: float = 1.0) -> float:
return max(lo, min(hello, x))
def _now_ts() -> float:
return time.time()
def _stable_hash(s: str) -> int:
h = hashlib.sha256(s.encode("utf-8")).hexdigest()
return int(h[:10], 16)
def _rand_choice_weighted(objects: List[Any], weights: List[float]) -> Any:
return random.decisions(objects, weights=weights, ok=1)[0]
def _pretty(obj: Any) -> str:
return json.dumps(obj, indent=2, sort_keys=False)
We arrange the setting by putting in the required libraries and importing all dependencies wanted for graph modeling, danger scoring, and API dealing with. We outline utility features for belief normalization, hashing, timestamping, and weighted sampling to help deterministic simulations. We put together helper features that simplify logging and structured output formatting all through the tutorial.
ZONES = ["public", "dmz", "app", "data", "admin"]
SENSITIVITY = {"public": 0.15, "dmz": 0.35, "app": 0.6, "knowledge": 0.85, "admin": 0.95}
ASSETS = {
"public": ["cdn", "landing", "status"],
"dmz": ["api_gateway", "waf", "vpn"],
"app": ["orders_svc", "billing_svc", "ml_inference", "inventory_svc"],
"knowledge": ["customer_db", "ledger_db", "feature_store"],
"admin": ["iam", "siem", "backup_vault"]
}
ACTIONS = ["read", "write", "deploy", "admin", "exfiltrate"]
ROLES = ["customer", "employee", "analyst", "engineer", "admin", "secops"]
DEVICE_TYPES = ["managed_laptop", "managed_server", "byod_phone", "unknown_iot"]
NETWORK_CONTEXT = ["corp_lan", "corp_vpn", "public_wifi", "tor_exit"]
@dataclass
class RequestContext:
consumer: str
function: str
device_id: str
device_type: str
device_posture: float
mfa: bool
supply: str
src_node: str
dst_node: str
motion: str
time_bucket: str
geo_risk: float
behavior_anomaly: float
data_volume: float
motive: str = ""
@dataclass
class Decision:
allowed: bool
trust_score: float
rule_hits: List[str] = area(default_factory=checklist)
controls: Dict[str, Any] = area(default_factory=dict)
clarification: str = ""
ts: float = area(default_factory=_now_ts)
@dataclass
class PrincipalState:
consumer: str
function: str
base_risk: float
last_seen_ts: float
rolling_denies: int = 0
rolling_allows: int = 0
quarantined: bool = False
compromise_score: float = 0.0
@dataclass
class DeviceState:
device_id: str
device_type: str
proprietor: str
posture: float
attested: bool
quarantined: bool = False
@dataclass
class FlowRecord:
ts: float
ctx: Dict[str, Any]
resolution: Dict[str, Any]
We outline the core area schema together with zones, belongings, roles, gadget sorts, and contextual alerts that form our Zero-Trust setting. We formalize request, resolution, principal, gadget, and stream document buildings utilizing dataclasses to keep readability and state integrity. We set up the foundational knowledge mannequin that permits steady belief analysis throughout identities, gadgets, and community paths.
def build_microsegmented_graph(seed: int = 7) -> nx.DiGraph:
random.seed(seed)
G = nx.DiGraph()
for z in ZONES:
G.add_node(f"zone:{z}", variety="zone", zone=z, sensitivity=SENSITIVITY[z])
for z, belongings in ASSETS.objects():
for a in belongings:
node = f"{z}:{a}"
G.add_node(node, variety="asset", zone=z, sensitivity=SENSITIVITY[z] + random.uniform(-0.05, 0.05))
G.add_edge(f"zone:{z}", node, variety="accommodates")
allowed_paths = [
("public", "dmz"),
("dmz", "app"),
("app", "data"),
("admin", "app"),
("admin", "data"),
("admin", "dmz"),
("dmz", "admin")
]
for src_z, dst_z in allowed_paths:
G.add_edge(f"zone:{src_z}", f"zone:{dst_z}", variety="zone_route", base_allowed=True)
for src_z, dst_z in allowed_paths:
for src_a in ASSETS[src_z]:
for dst_a in ASSETS[dst_z]:
if random.random() < 0.45:
G.add_edge(f"{src_z}:{src_a}", f"{dst_z}:{dst_a}", variety="service_call", base_allowed=True)
for z in ZONES:
for a in ASSETS[z]:
if random.random() < 0.35:
G.add_edge(f"{z}:{a}", f"{z}:{a}", variety="self", base_allowed=True)
return G
def draw_graph(G: nx.DiGraph, title: str = "Zero-Trust Microsegmented Network Graph") -> None:
plt.determine(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, ok=0.35)
sorts = nx.get_node_attributes(G, "variety")
node_colors = []
for n in G.nodes():
if sorts.get(n) == "zone":
node_colors.append(0.85)
else:
node_colors.append(G.nodes[n].get("sensitivity", 0.5))
nx.draw_networkx_nodes(G, pos, node_size=350, node_color=node_colors)
nx.draw_networkx_edges(G, pos, arrows=True, alpha=0.25)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title(title)
plt.axis("off")
plt.present()
We assemble a micro-segmented directed community graph the place zones and belongings are explicitly modeled with sensitivity attributes. We programmatically generate inter-zone and service-level communication paths to simulate lifelike enterprise site visitors patterns. We visualize the community topology to clearly observe segmentation boundaries and potential lateral motion routes.
class ZeroTrustPolicyEngine:
def __init__(self, G: nx.DiGraph):
self.G = G
self.principals: Dict[str, PrincipalState] = {}
self.gadgets: Dict[str, DeviceState] = {}
self.flow_log: List[FlowRecord] = []
self.blocked_edges: set = set()
self.policy_version = "ztpe-v1.3"
self.role_perms = {
"buyer": {"public": {"learn"}, "dmz": {"learn"}},
"worker": {"public": {"learn"}, "dmz": {"learn"}, "app": {"learn", "write"}},
"analyst": {"public": {"learn"}, "dmz": {"learn"}, "app": {"learn"}, "knowledge": {"learn"}},
"engineer": {"public": {"learn"}, "dmz": {"learn"}, "app": {"learn", "write", "deploy"}, "knowledge": {"learn"}},
"admin": {"public": {"learn"}, "dmz": {"learn", "write"}, "app": {"learn", "write", "deploy", "admin"}, "knowledge": {"learn", "write", "admin"}, "admin": {"learn", "write", "admin"}},
"secops": {"public": {"learn"}, "dmz": {"learn", "write"}, "app": {"learn", "admin"}, "knowledge": {"learn", "admin"}, "admin": {"learn", "admin"}},
}
self.w = {
"role_fit": 1.4,
"device_posture": 1.8,
"mfa": 1.0,
"network_context": 1.2,
"time": 0.6,
"geo_risk": 1.2,
"behavior_anomaly": 2.2,
"data_volume": 1.4,
"principal_base_risk": 1.3,
"principal_compromise": 2.0,
"asset_sensitivity": 1.6,
"path_validity": 1.5,
"quarantine": 4.0,
}
self.thresholds = {
"permit": 0.72,
"step_up": 0.62,
"rate_limit": 0.55,
"deny": 0.0
}
def register_principal(self, consumer: str, function: str, base_risk: float) -> None:
self.principals[user] = PrincipalState(
consumer=consumer,
function=function,
base_risk=_clamp(base_risk),
last_seen_ts=_now_ts()
)
def register_device(self, device_id: str, device_type: str, proprietor: str, posture: float, attested: bool) -> None:
self.gadgets[device_id] = DeviceState(
device_id=device_id,
device_type=device_type,
proprietor=proprietor,
posture=_clamp(posture),
attested=bool(attested)
)
def _asset_zone_and_sensitivity(self, node: str) -> Tuple[str, float]:
if node.startswith("zone:"):
z = node.cut up(":", 1)[1]
return z, SENSITIVITY.get(z, 0.5)
z = self.G.nodes[node].get("zone", "public")
sens = float(self.G.nodes[node].get("sensitivity", SENSITIVITY.get(z, 0.5)))
return z, _clamp(sens)
def _base_abac_check(self, function: str, dst_zone: str, motion: str) -> bool:
return motion in self.role_perms.get(function, {}).get(dst_zone, set())
def _path_is_valid(self, src: str, dst: str) -> bool:
if (src, dst) in self.blocked_edges:
return False
strive:
return nx.has_path(self.G, src, dst)
besides nx.NetworkXError:
return False
def _network_context_risk(self, supply: str) -> float:
desk = {"corp_lan": 0.1, "corp_vpn": 0.25, "public_wifi": 0.65, "tor_exit": 0.9}
return desk.get(supply, 0.6)
def _time_risk(self, time_bucket: str) -> float:
return 0.15 if time_bucket == "business_hours" else 0.55
def _compute_trust_score(self, ctx: RequestContext) -> Tuple[float, List[str], Dict[str, Any]]:
rule_hits = []
controls: Dict[str, Any] = {}
principal = self.principals.get(ctx.consumer)
gadget = self.gadgets.get(ctx.device_id)
if principal is None:
rule_hits.append("unknown_principal")
principal = PrincipalState(ctx.consumer, ctx.function, base_risk=0.85, last_seen_ts=_now_ts())
if gadget is None:
rule_hits.append("unknown_device")
gadget = DeviceState(ctx.device_id, ctx.device_type, proprietor=ctx.consumer, posture=0.25, attested=False)
src_zone, src_sens = self._asset_zone_and_sensitivity(ctx.src_node)
dst_zone, dst_sens = self._asset_zone_and_sensitivity(ctx.dst_node)
abac_ok = self._base_abac_check(ctx.function, dst_zone, ctx.motion)
if not abac_ok:
rule_hits.append("abac_denied")
path_ok = self._path_is_valid(ctx.src_node, ctx.dst_node)
if not path_ok:
rule_hits.append("invalid_path_or_blocked")
if principal.quarantined or gadget.quarantined:
rule_hits.append("quarantined")
controls["auto_response"] = "deny_quarantine"
if ctx.motion == "exfiltrate":
rule_hits.append("exfil_attempt")
if dst_zone in ["admin", "data"] and not ctx.mfa:
rule_hits.append("mfa_required_for_sensitive_zone")
controls["step_up_mfa"] = True
if gadget.proprietor != ctx.consumer:
rule_hits.append("device_owner_mismatch")
net_r = self._network_context_risk(ctx.supply)
t_r = self._time_risk(ctx.time_bucket)
role_fit = 1.0 if abac_ok else 0.0
posture = _clamp(gadget.posture if gadget.attested else gadget.posture * 0.75)
mfa = 1.0 if ctx.mfa else 0.0
path_valid = 1.0 if path_ok else 0.0
sens = _clamp(dst_sens)
principal_risk = _clamp(principal.base_risk)
compromise = _clamp(principal.compromise_score)
anomaly = _clamp(ctx.behavior_anomaly)
geo = _clamp(ctx.geo_risk)
data_vol = _clamp(ctx.data_volume)
quarantine_penalty = 1.0 if (principal.quarantined or gadget.quarantined) else 0.0
owner_mismatch_penalty = 1.0 if (gadget.proprietor != ctx.consumer) else 0.0
exfil_penalty = 1.0 if (ctx.motion == "exfiltrate") else 0.0
z = 0.0
z += self.w["role_fit"] * (role_fit - 0.5)
z += self.w["device_posture"] * (posture - 0.5)
z += self.w["mfa"] * (mfa - 0.5)
z += self.w["path_validity"] * (path_valid - 0.5)
z -= self.w["asset_sensitivity"] * (sens - 0.35)
z -= self.w["network_context"] * (net_r - 0.25)
z -= self.w["time"] * (t_r - 0.15)
z -= self.w["geo_risk"] * (geo - 0.2)
z -= self.w["behavior_anomaly"] * (anomaly - 0.1)
z -= self.w["data_volume"] * (data_vol - 0.15)
z -= self.w["principal_base_risk"] * (principal_risk - 0.2)
z -= self.w["principal_compromise"] * (compromise - 0.0)
z -= 2.0 * owner_mismatch_penalty
z -= 2.5 * exfil_penalty
z -= self.w["quarantine"] * quarantine_penalty
belief = _sigmoid(z)
if belief < self.thresholds["rate_limit"]:
controls["rate_limit"] = True
if belief < self.thresholds["step_up"]:
controls["step_up"] = bool(controls.get("step_up_mfa", False) or dst_zone in ["admin", "data"])
if belief < self.thresholds["allow"]:
controls["continuous_auth"] = True
if "abac_denied" in rule_hits or "invalid_path_or_blocked" in rule_hits or "exfil_attempt" in rule_hits:
controls["risk_signal"] = "policy_violation"
if anomaly > 0.75 and sens > 0.75:
controls["auto_response"] = "quarantine_candidate"
return _clamp(belief), rule_hits, controls
def consider(self, ctx: RequestContext) -> Decision:
belief, rule_hits, controls = self._compute_trust_score(ctx)
allowed = belief >= self.thresholds["allow"]
if controls.get("step_up"):
if not ctx.mfa:
allowed = False
rule_hits.append("step_up_failed_no_mfa")
else:
allowed = allowed or (belief >= self.thresholds["step_up"])
if controls.get("rate_limit") and belief < 0.5:
allowed = False
rule_hits.append("rate_limited_denied")
clarification = self._explain(ctx, belief, allowed, rule_hits, controls)
dec = Decision(allowed=allowed, trust_score=belief, rule_hits=rule_hits, controls=controls, clarification=clarification)
self._post_decision_updates(ctx, dec)
self.flow_log.append(
FlowRecord(
ts=dec.ts,
ctx=ctx.__dict__.copy(),
resolution={
"allowed": dec.allowed,
"trust_score": dec.trust_score,
"rule_hits": dec.rule_hits,
"controls": dec.controls,
"clarification": dec.clarification
}
)
)
return dec
def _explain(self, ctx: RequestContext, belief: float, allowed: bool, hits: List[str], controls: Dict[str, Any]) -> str:
src_z, _ = self._asset_zone_and_sensitivity(ctx.src_node)
dst_z, dst_s = self._asset_zone_and_sensitivity(ctx.dst_node)
bits = []
bits.append(f"Decision={'ALLOW' if allowed else 'DENY'} | belief={belief:.3f} | {ctx.consumer}({ctx.function}) {ctx.motion} {ctx.src_node}->{ctx.dst_node}")
bits.append(f"Context: supply={ctx.supply}, time={ctx.time_bucket}, geo_risk={ctx.geo_risk:.2f}, anomaly={ctx.behavior_anomaly:.2f}, data_vol={ctx.data_volume:.2f}")
bits.append(f"Zones: {src_z} -> {dst_z} (dst_sensitivity={dst_s:.2f}) | MFA={'sure' if ctx.mfa else 'no'} | posture={ctx.device_posture:.2f}")
if hits:
bits.append(f"Rule hits: {', '.be part of(hits)}")
if controls:
bits.append(f"Controls: {controls}")
return " | ".be part of(bits)
def _post_decision_updates(self, ctx: RequestContext, dec: Decision) -> None:
p = self.principals.get(ctx.consumer)
d = self.gadgets.get(ctx.device_id)
if p is None:
self.register_principal(ctx.consumer, ctx.function, base_risk=0.65)
p = self.principals[ctx.user]
if d is None:
self.register_device(ctx.device_id, ctx.device_type, ctx.consumer, ctx.device_posture, attested=(ctx.device_type.startswith("managed")))
d = self.gadgets[ctx.device_id]
p.last_seen_ts = dec.ts
if dec.allowed:
p.rolling_allows += 1
p.rolling_denies = max(0, p.rolling_denies - 1)
p.compromise_score = _clamp(p.compromise_score - 0.02)
else:
p.rolling_denies += 1
p.compromise_score = _clamp(p.compromise_score + 0.06 + 0.10 * (1.0 if "exfil_attempt" in dec.rule_hits else 0.0))
if dec.controls.get("auto_response") == "quarantine_candidate" or p.rolling_denies >= 4 or p.compromise_score > 0.78:
p.quarantined = True
if d:
d.quarantined = True
if ("invalid_path_or_blocked" in dec.rule_hits) or ("exfil_attempt" in dec.rule_hits) or ("abac_denied" in dec.rule_hits):
self.blocked_edges.add((ctx.src_node, ctx.dst_node))
def stats(self) -> Dict[str, Any]:
whole = len(self.flow_log)
permits = sum(1 for r in self.flow_log if r.resolution["allowed"])
denies = whole - permits
top_denies = {}
for r in self.flow_log:
if not r.resolution["allowed"]:
for h in r.resolution["rule_hits"]:
top_denies[h] = top_denies.get(h, 0) + 1
principals = {
u: {
"function": p.function,
"base_risk": spherical(p.base_risk, 3),
"compromise_score": spherical(p.compromise_score, 3),
"rolling_denies": p.rolling_denies,
"rolling_allows": p.rolling_allows,
"quarantined": p.quarantined
}
for u, p in self.principals.objects()
}
gadgets = {
did: {
"proprietor": d.proprietor,
"sort": d.device_type,
"posture": spherical(d.posture, 3),
"attested": d.attested,
"quarantined": d.quarantined
}
for did, d in self.gadgets.objects()
}
return {
"policy_version": self.policy_version,
"flows_total": whole,
"flows_allow": permits,
"flows_deny": denies,
"deny_reasons_top": dict(sorted(top_denies.objects(), key=lambda kv: kv[1], reverse=True)[:10]),
"blocked_edges_count": len(self.blocked_edges),
"principals": principals,
"gadgets": gadgets
}
We implement the dynamic Zero-Trust Policy Engine that evaluates each request utilizing ABAC, contextual danger alerts, behavioral anomaly scores, and path validation. We compute a steady belief rating by way of a weighted danger mannequin and set off adaptive controls similar to step-up authentication, charge limiting, and quarantine. We replace the principal and gadget state after every resolution to simulate steady verification and evolving danger posture.
def make_world(engine: ZeroTrustPolicyEngine, seed: int = 13) -> Dict[str, Any]:
random.seed(seed)
customers = [
("alice", "employee", 0.18),
("bob", "engineer", 0.22),
("cathy", "analyst", 0.25),
("dan", "admin", 0.15),
("eve", "secops", 0.10),
("mallory", "employee", 0.55)
]
for u, r, br in customers:
engine.register_principal(u, r, br)
gadgets = [
("dev-alice-lt", "managed_laptop", "alice", 0.82, True),
("dev-bob-lt", "managed_laptop", "bob", 0.77, True),
("dev-cathy-lt", "managed_laptop", "cathy", 0.74, True),
("dev-dan-lt", "managed_laptop", "dan", 0.88, True),
("dev-eve-lt", "managed_laptop", "eve", 0.90, True),
("dev-mallory-byod", "byod_phone", "mallory", 0.42, False),
("unknown-iot-7", "unknown_iot", "unknown", 0.20, False),
]
for did, dt, proprietor, posture, attested in gadgets:
engine.register_device(did, dt, proprietor, posture, attested)
all_assets = [n for n in engine.G.nodes() if engine.G.nodes[n].get("variety") == "asset"]
by_zone = {z: [a for a in all_assets if engine.G.nodes[a].get("zone") == z] for z in ZONES}
return {"customers": customers, "gadgets": gadgets, "belongings": all_assets, "by_zone": by_zone}
def gen_request(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], variety: str = "regular", seed_salt: str = "") -> RequestContext:
rnd = random.Random(_stable_hash(variety + seed_salt + str(_now_ts())[:6]))
customers = world["users"]
by_zone = world["by_zone"]
def pick_user(role_bias: Optional[str] = None) -> Tuple[str, str]:
if role_bias:
filtered = [u for u in users if u[1] == role_bias]
if filtered:
u, r, _ = rnd.alternative(filtered)
return u, r
u, r, _ = rnd.alternative(customers)
return u, r
def user_device(u: str) -> Tuple[str, str, float]:
candidates = [d for d in engine.devices.values() if d.owner == u]
if candidates:
d = rnd.alternative(candidates)
else:
d = rnd.alternative(checklist(engine.gadgets.values()))
return d.device_id, d.device_type, d.posture
def time_bucket():
return "business_hours" if rnd.random() < 0.75 else "after_hours"
supply = _rand_choice_weighted(NETWORK_CONTEXT, [0.45, 0.25, 0.22, 0.08])
geo_risk = _clamp(rnd.uniform(0.05, 0.35) + (0.25 if supply in ["public_wifi", "tor_exit"] else 0.0))
behavior_anomaly = _clamp(rnd.uniform(0.02, 0.25))
data_volume = _clamp(rnd.uniform(0.02, 0.25))
if variety == "regular":
u, r = pick_user()
did, dt, posture = user_device(u)
src_zone = _rand_choice_weighted(["public", "dmz", "app"], [0.15, 0.55, 0.30])
dst_zone = _rand_choice_weighted(["dmz", "app", "data"], [0.35, 0.45, 0.20])
motion = _rand_choice_weighted(ACTIONS, [0.55, 0.28, 0.07, 0.08, 0.02])
src = rnd.alternative(by_zone[src_zone])
dst = rnd.alternative(by_zone[dst_zone])
mfa = True if dst_zone in ["data", "admin"] else (rnd.random() < 0.55)
return RequestContext(
consumer=u, function=r,
device_id=did, device_type=dt, device_posture=posture,
mfa=mfa, supply=supply,
src_node=src, dst_node=dst,
motion=motion,
time_bucket=time_bucket(),
geo_risk=geo_risk,
behavior_anomaly=behavior_anomaly,
data_volume=data_volume,
motive="routine_access"
)
if variety == "malicious_flow":
u, r = ("unknown_actor", "buyer")
did, dt, posture = ("unknown-dev", "unknown_iot", 0.18)
supply = _rand_choice_weighted(["tor_exit", "public_wifi"], [0.65, 0.35])
geo_risk = _clamp(rnd.uniform(0.6, 0.95))
behavior_anomaly = _clamp(rnd.uniform(0.75, 0.98))
data_volume = _clamp(rnd.uniform(0.75, 0.98))
src = rnd.alternative(by_zone["public"] + by_zone["dmz"])
dst = rnd.alternative(by_zone["data"] + by_zone["admin"])
motion = _rand_choice_weighted(["write", "admin", "exfiltrate"], [0.25, 0.25, 0.50])
mfa = False
return RequestContext(
consumer=u, function=r,
device_id=did, device_type=dt, device_posture=posture,
mfa=mfa, supply=supply,
src_node=src, dst_node=dst,
motion=motion,
time_bucket="after_hours",
geo_risk=geo_risk,
behavior_anomaly=behavior_anomaly,
data_volume=data_volume,
motive="external_malicious_attempt"
)
if variety == "insider_threat":
u, r = ("mallory", "worker")
did, dt, posture = user_device(u)
supply = _rand_choice_weighted(["corp_vpn", "public_wifi"], [0.55, 0.45])
geo_risk = _clamp(rnd.uniform(0.25, 0.65))
behavior_anomaly = _clamp(rnd.uniform(0.55, 0.95))
data_volume = _clamp(rnd.uniform(0.55, 0.95))
src = rnd.alternative(by_zone["app"] + by_zone["dmz"])
dst = rnd.alternative(by_zone["data"] + by_zone["admin"])
motion = _rand_choice_weighted(["read", "write", "exfiltrate", "admin"], [0.18, 0.22, 0.45, 0.15])
mfa = rnd.random() < 0.25
return RequestContext(
consumer=u, function=r,
device_id=did, device_type=dt, device_posture=posture,
mfa=mfa, supply=supply,
src_node=src, dst_node=dst,
motion=motion,
time_bucket="after_hours",
geo_risk=geo_risk,
behavior_anomaly=behavior_anomaly,
data_volume=data_volume,
motive="insider_lateral_and_exfil"
)
elevate ValueError(f"Unknown variety={variety}")
def run_simulation(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], steps: int = 60, seed: int = 99) -> Dict[str, Any]:
random.seed(seed)
outcomes = {"allowed": 0, "denied": 0, "samples": []}
for i in vary(steps):
if i in [12, 13, 14, 28, 29]:
ctx = gen_request(engine, world, variety="malicious_flow", seed_salt=str(i))
elif i in [18, 19, 20, 34, 35, 36, 50, 51]:
ctx = gen_request(engine, world, variety="insider_threat", seed_salt=str(i))
else:
ctx = gen_request(engine, world, variety="regular", seed_salt=str(i))
dec = engine.consider(ctx)
if dec.allowed:
outcomes["allowed"] += 1
else:
outcomes["denied"] += 1
if i < 10 or (not dec.allowed and len(outcomes["samples"]) < 18):
outcomes["samples"].append({"ctx": ctx.__dict__, "resolution": dec.__dict__})
return outcomes
We generate lifelike site visitors situations, together with regular enterprise exercise, malicious exterior flows, and insider lateral-movement makes an attempt. We simulate contextual variables, together with geo-risk, anomaly scores, and knowledge quantity, to stress-test the coverage engine. We run multi-step simulations to observe how belief scores shift and how the engine progressively blocks dangerous habits.
def make_app(engine: ZeroTrustPolicyEngine, world: Dict[str, Any]) -> Flask:
app = Flask(__name__)
@app.get("/well being")
def well being():
return jsonify({"okay": True, "policy_version": engine.policy_version})
@app.get("/graph")
def graph():
nodes = [{"id": n, **engine.G.nodes[n]} for n in engine.G.nodes()]
edges = [{"src": u, "dst": v, **engine.G.edges[u, v]} for u, v in engine.G.edges()]
return jsonify({"nodes": nodes, "edges": edges, "blocked_edges": checklist(map(checklist, engine.blocked_edges))})
@app.submit("/request")
def evaluate_request():
payload = request.get_json(pressure=True)
ctx = RequestContext(**payload)
dec = engine.consider(ctx)
return jsonify({"allowed": dec.allowed, "trust_score": dec.trust_score, "rule_hits": dec.rule_hits, "controls": dec.controls, "clarification": dec.clarification})
@app.submit("/simulate")
def simulate():
payload = request.get_json(pressure=True) if request.knowledge else {}
steps = int(payload.get("steps", 50))
res = run_simulation(engine, world, steps=steps, seed=int(payload.get("seed", 123)))
return jsonify({"steps": steps, "allowed": res["allowed"], "denied": res["denied"], "stats": engine.stats()})
@app.get("/stats")
def stats():
return jsonify(engine.stats())
return app
G = build_microsegmented_graph(seed=7)
engine = ZeroTrustPolicyEngine(G)
world = make_world(engine, seed=13)
draw_graph(G, title="Zero-Trust Microsegmented Network (Zones + Assets + Directed Flows)")
app = make_app(engine, world)
shopper = app.test_client()
print("== Health ==")
print(shopper.get("/well being").json)
print("n== Run simulation (combination: regular + malicious flows + insider menace) ==")
sim_out = shopper.submit("/simulate", json={"steps": 70, "seed": 2026}).json
print(_pretty({"allowed": sim_out["allowed"], "denied": sim_out["denied"], "blocked_edges_count": sim_out["stats"]["blocked_edges_count"]}))
print("n== Top deny causes ==")
print(_pretty(sim_out["stats"]["deny_reasons_top"]))
print("n== Principal danger snapshot (watch mallory) ==")
principals = sim_out["stats"]["principals"]
focus = {ok: principals[k] for ok in sorted(principals.keys()) if ok in ["alice","bob","cathy","dan","eve","mallory","unknown_actor"]}
print(_pretty(focus))
print("n== Example: ship a direct insider exfil request through the coverage API ==")
insider_ctx = gen_request(engine, world, variety="insider_threat", seed_salt="manual-1")
insider_ctx.motion = "exfiltrate"
insider_ctx.mfa = False
insider_ctx.behavior_anomaly = 0.92
insider_ctx.data_volume = 0.88
insider_ctx.geo_risk = 0.62
resp = shopper.submit("/request", json=insider_ctx.__dict__).json
print(_pretty(resp))
print("n== Example: a respectable admin learn with MFA from corp_lan ==")
admin_ctx = RequestContext(
consumer="dan", function="admin",
device_id="dev-dan-lt", device_type="managed_laptop", device_posture=engine.gadgets["dev-dan-lt"].posture,
mfa=True, supply="corp_lan",
src_node=random.alternative(world["by_zone"]["admin"]),
dst_node=random.alternative(world["by_zone"]["data"]),
motion="learn",
time_bucket="business_hours",
geo_risk=0.08,
behavior_anomaly=0.06,
data_volume=0.10,
motive="admin_operational_access"
)
resp2 = shopper.submit("/request", json=admin_ctx.__dict__).json
print(_pretty(resp2))
print("n== Final stats ==")
final_stats = shopper.get("/stats").json
print(_pretty({
"flows_total": final_stats["flows_total"],
"flows_allow": final_stats["flows_allow"],
"flows_deny": final_stats["flows_deny"],
"blocked_edges_count": final_stats["blocked_edges_count"],
"deny_reasons_top": final_stats["deny_reasons_top"]
}))
scores = [r.decision["trust_score"] for r in engine.flow_log]
plt.determine(figsize=(9, 4))
plt.hist(scores, bins=18)
plt.title("Trust Score Distribution Across Simulated Flows")
plt.xlabel("trust_score")
plt.ylabel("depend")
plt.present()
denied = [r for r in engine.flow_log if not r.decision["allowed"]]
print("n== Recent denied explanations (final 6) ==")
for r in denied[-6:]:
print("-", r.resolution["explanation"])
We expose the coverage engine by way of a Flask API and work together with it utilizing a take a look at shopper to hold the pocket book self-contained. We run simulations, examine belief distributions, analyze denial causes, and observe quarantine and edge-blocking habits. We conclude by visualizing belief rating patterns and analyzing denied explanations to validate the Zero-Trust enforcement logic in motion.
In conclusion, we demonstrated how Zero Trust turns into a measurable, programmable system when identification, gadget state, community context, and habits alerts are evaluated collectively for each interplay. We noticed the coverage engine deny or step up dangerous requests, rate-limit low-trust exercise, and dynamically block abusive edges to stop repeated lateral motion and knowledge theft. By combining graph-based segmentation with an evolving belief rating and automated responses, we ended with a repeatable framework that we will lengthen with richer telemetry, higher anomaly fashions, and environment-specific insurance policies whereas retaining the core “by no means belief, at all times confirm” loop intact.
Check out the Full Codes with Notebook. Also, be at liberty to comply with us on Twitter and don’t overlook to be part of our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to accomplice with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so on.? Connect with us
The submit How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection appeared first on MarkTechPost.
