|

How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection

In this tutorial, we construct a lifelike Zero-Trust community simulation by modeling a micro-segmented setting as a directed graph and forcing each request to earn entry by way of steady verification. We implement a dynamic coverage engine that blends ABAC-style permissions with gadget posture, MFA, path reachability, zone sensitivity, and reside danger alerts similar to anomaly and data-volume indicators. We then operationalize the mannequin by way of a Flask API and run combined site visitors, together with insider-lateral motion and exfiltration makes an attempt, to present how belief scoring, adaptive controls, and automated quarantines block malicious flows in actual time.

!pip -q set up networkx flask


import math
import json
import time
import random
import hashlib
from dataclasses import dataclass, area
from typing import Dict, Any, List, Tuple, Optional


import networkx as nx
from flask import Flask, request, jsonify


import matplotlib.pyplot as plt




def _sigmoid(x: float) -> float:
   return 1.0 / (1.0 + math.exp(-x))


def _clamp(x: float, lo: float = 0.0, hello: float = 1.0) -> float:
   return max(lo, min(hello, x))


def _now_ts() -> float:
   return time.time()


def _stable_hash(s: str) -> int:
   h = hashlib.sha256(s.encode("utf-8")).hexdigest()
   return int(h[:10], 16)


def _rand_choice_weighted(objects: List[Any], weights: List[float]) -> Any:
   return random.decisions(objects, weights=weights, ok=1)[0]


def _pretty(obj: Any) -> str:
   return json.dumps(obj, indent=2, sort_keys=False)

We arrange the setting by putting in the required libraries and importing all dependencies wanted for graph modeling, danger scoring, and API dealing with. We outline utility features for belief normalization, hashing, timestamping, and weighted sampling to help deterministic simulations. We put together helper features that simplify logging and structured output formatting all through the tutorial.

ZONES = ["public", "dmz", "app", "data", "admin"]
SENSITIVITY = {"public": 0.15, "dmz": 0.35, "app": 0.6, "knowledge": 0.85, "admin": 0.95}


ASSETS = {
   "public": ["cdn", "landing", "status"],
   "dmz": ["api_gateway", "waf", "vpn"],
   "app": ["orders_svc", "billing_svc", "ml_inference", "inventory_svc"],
   "knowledge": ["customer_db", "ledger_db", "feature_store"],
   "admin": ["iam", "siem", "backup_vault"]
}      


ACTIONS = ["read", "write", "deploy", "admin", "exfiltrate"]


ROLES = ["customer", "employee", "analyst", "engineer", "admin", "secops"]


DEVICE_TYPES = ["managed_laptop", "managed_server", "byod_phone", "unknown_iot"]
NETWORK_CONTEXT = ["corp_lan", "corp_vpn", "public_wifi", "tor_exit"]


@dataclass
class RequestContext:
   consumer: str
   function: str
   device_id: str
   device_type: str
   device_posture: float
   mfa: bool
   supply: str
   src_node: str
   dst_node: str
   motion: str
   time_bucket: str
   geo_risk: float
   behavior_anomaly: float
   data_volume: float
   motive: str = ""


@dataclass
class Decision:
   allowed: bool
   trust_score: float
   rule_hits: List[str] = area(default_factory=checklist)
   controls: Dict[str, Any] = area(default_factory=dict)
   clarification: str = ""
   ts: float = area(default_factory=_now_ts)


@dataclass
class PrincipalState:
   consumer: str
   function: str
   base_risk: float
   last_seen_ts: float
   rolling_denies: int = 0
   rolling_allows: int = 0
   quarantined: bool = False
   compromise_score: float = 0.0


@dataclass
class DeviceState:
   device_id: str
   device_type: str
   proprietor: str
   posture: float
   attested: bool
   quarantined: bool = False


@dataclass
class FlowRecord:
   ts: float
   ctx: Dict[str, Any]
   resolution: Dict[str, Any]

We outline the core area schema together with zones, belongings, roles, gadget sorts, and contextual alerts that form our Zero-Trust setting. We formalize request, resolution, principal, gadget, and stream document buildings utilizing dataclasses to keep readability and state integrity. We set up the foundational knowledge mannequin that permits steady belief analysis throughout identities, gadgets, and community paths.

def build_microsegmented_graph(seed: int = 7) -> nx.DiGraph:
   random.seed(seed)
   G = nx.DiGraph()


   for z in ZONES:
       G.add_node(f"zone:{z}", variety="zone", zone=z, sensitivity=SENSITIVITY[z])


   for z, belongings in ASSETS.objects():
       for a in belongings:
           node = f"{z}:{a}"
           G.add_node(node, variety="asset", zone=z, sensitivity=SENSITIVITY[z] + random.uniform(-0.05, 0.05))
           G.add_edge(f"zone:{z}", node, variety="accommodates")


   allowed_paths = [
       ("public", "dmz"),
       ("dmz", "app"),
       ("app", "data"),
       ("admin", "app"),
       ("admin", "data"),
       ("admin", "dmz"),
       ("dmz", "admin")
   ]


   for src_z, dst_z in allowed_paths:
       G.add_edge(f"zone:{src_z}", f"zone:{dst_z}", variety="zone_route", base_allowed=True)


   for src_z, dst_z in allowed_paths:
       for src_a in ASSETS[src_z]:
           for dst_a in ASSETS[dst_z]:
               if random.random() < 0.45:
                   G.add_edge(f"{src_z}:{src_a}", f"{dst_z}:{dst_a}", variety="service_call", base_allowed=True)


   for z in ZONES:
       for a in ASSETS[z]:
           if random.random() < 0.35:
               G.add_edge(f"{z}:{a}", f"{z}:{a}", variety="self", base_allowed=True)


   return G


def draw_graph(G: nx.DiGraph, title: str = "Zero-Trust Microsegmented Network Graph") -> None:
   plt.determine(figsize=(14, 9))
   pos = nx.spring_layout(G, seed=42, ok=0.35)
   sorts = nx.get_node_attributes(G, "variety")
   node_colors = []
   for n in G.nodes():
       if sorts.get(n) == "zone":
           node_colors.append(0.85)
       else:
           node_colors.append(G.nodes[n].get("sensitivity", 0.5))
   nx.draw_networkx_nodes(G, pos, node_size=350, node_color=node_colors)
   nx.draw_networkx_edges(G, pos, arrows=True, alpha=0.25)
   nx.draw_networkx_labels(G, pos, font_size=8)
   plt.title(title)
   plt.axis("off")
   plt.present()

We assemble a micro-segmented directed community graph the place zones and belongings are explicitly modeled with sensitivity attributes. We programmatically generate inter-zone and service-level communication paths to simulate lifelike enterprise site visitors patterns. We visualize the community topology to clearly observe segmentation boundaries and potential lateral motion routes.

class ZeroTrustPolicyEngine:
   def __init__(self, G: nx.DiGraph):
       self.G = G
       self.principals: Dict[str, PrincipalState] = {}
       self.gadgets: Dict[str, DeviceState] = {}
       self.flow_log: List[FlowRecord] = []
       self.blocked_edges: set = set()
       self.policy_version = "ztpe-v1.3"


       self.role_perms = {
           "buyer": {"public": {"learn"}, "dmz": {"learn"}},
           "worker": {"public": {"learn"}, "dmz": {"learn"}, "app": {"learn", "write"}},
           "analyst": {"public": {"learn"}, "dmz": {"learn"}, "app": {"learn"}, "knowledge": {"learn"}},
           "engineer": {"public": {"learn"}, "dmz": {"learn"}, "app": {"learn", "write", "deploy"}, "knowledge": {"learn"}},
           "admin": {"public": {"learn"}, "dmz": {"learn", "write"}, "app": {"learn", "write", "deploy", "admin"}, "knowledge": {"learn", "write", "admin"}, "admin": {"learn", "write", "admin"}},
           "secops": {"public": {"learn"}, "dmz": {"learn", "write"}, "app": {"learn", "admin"}, "knowledge": {"learn", "admin"}, "admin": {"learn", "admin"}},
       }


       self.w = {
           "role_fit": 1.4,
           "device_posture": 1.8,
           "mfa": 1.0,
           "network_context": 1.2,
           "time": 0.6,
           "geo_risk": 1.2,
           "behavior_anomaly": 2.2,
           "data_volume": 1.4,
           "principal_base_risk": 1.3,
           "principal_compromise": 2.0,
           "asset_sensitivity": 1.6,
           "path_validity": 1.5,
           "quarantine": 4.0,
       }


       self.thresholds = {
           "permit": 0.72,
           "step_up": 0.62,
           "rate_limit": 0.55,
           "deny": 0.0
       }


   def register_principal(self, consumer: str, function: str, base_risk: float) -> None:
       self.principals[user] = PrincipalState(
           consumer=consumer,
           function=function,
           base_risk=_clamp(base_risk),
           last_seen_ts=_now_ts()
       )


   def register_device(self, device_id: str, device_type: str, proprietor: str, posture: float, attested: bool) -> None:
       self.gadgets[device_id] = DeviceState(
           device_id=device_id,
           device_type=device_type,
           proprietor=proprietor,
           posture=_clamp(posture),
           attested=bool(attested)
       )


   def _asset_zone_and_sensitivity(self, node: str) -> Tuple[str, float]:
       if node.startswith("zone:"):
           z = node.cut up(":", 1)[1]
           return z, SENSITIVITY.get(z, 0.5)
       z = self.G.nodes[node].get("zone", "public")
       sens = float(self.G.nodes[node].get("sensitivity", SENSITIVITY.get(z, 0.5)))
       return z, _clamp(sens)


   def _base_abac_check(self, function: str, dst_zone: str, motion: str) -> bool:
       return motion in self.role_perms.get(function, {}).get(dst_zone, set())


   def _path_is_valid(self, src: str, dst: str) -> bool:
       if (src, dst) in self.blocked_edges:
           return False
       strive:
           return nx.has_path(self.G, src, dst)
       besides nx.NetworkXError:
           return False


   def _network_context_risk(self, supply: str) -> float:
       desk = {"corp_lan": 0.1, "corp_vpn": 0.25, "public_wifi": 0.65, "tor_exit": 0.9}
       return desk.get(supply, 0.6)


   def _time_risk(self, time_bucket: str) -> float:
       return 0.15 if time_bucket == "business_hours" else 0.55


   def _compute_trust_score(self, ctx: RequestContext) -> Tuple[float, List[str], Dict[str, Any]]:
       rule_hits = []
       controls: Dict[str, Any] = {}


       principal = self.principals.get(ctx.consumer)
       gadget = self.gadgets.get(ctx.device_id)


       if principal is None:
           rule_hits.append("unknown_principal")
           principal = PrincipalState(ctx.consumer, ctx.function, base_risk=0.85, last_seen_ts=_now_ts())


       if gadget is None:
           rule_hits.append("unknown_device")
           gadget = DeviceState(ctx.device_id, ctx.device_type, proprietor=ctx.consumer, posture=0.25, attested=False)


       src_zone, src_sens = self._asset_zone_and_sensitivity(ctx.src_node)
       dst_zone, dst_sens = self._asset_zone_and_sensitivity(ctx.dst_node)


       abac_ok = self._base_abac_check(ctx.function, dst_zone, ctx.motion)
       if not abac_ok:
           rule_hits.append("abac_denied")


       path_ok = self._path_is_valid(ctx.src_node, ctx.dst_node)
       if not path_ok:
           rule_hits.append("invalid_path_or_blocked")


       if principal.quarantined or gadget.quarantined:
           rule_hits.append("quarantined")
           controls["auto_response"] = "deny_quarantine"


       if ctx.motion == "exfiltrate":
           rule_hits.append("exfil_attempt")


       if dst_zone in ["admin", "data"] and not ctx.mfa:
           rule_hits.append("mfa_required_for_sensitive_zone")
           controls["step_up_mfa"] = True


       if gadget.proprietor != ctx.consumer:
           rule_hits.append("device_owner_mismatch")


       net_r = self._network_context_risk(ctx.supply)
       t_r = self._time_risk(ctx.time_bucket)


       role_fit = 1.0 if abac_ok else 0.0
       posture = _clamp(gadget.posture if gadget.attested else gadget.posture * 0.75)
       mfa = 1.0 if ctx.mfa else 0.0
       path_valid = 1.0 if path_ok else 0.0
       sens = _clamp(dst_sens)


       principal_risk = _clamp(principal.base_risk)
       compromise = _clamp(principal.compromise_score)
       anomaly = _clamp(ctx.behavior_anomaly)
       geo = _clamp(ctx.geo_risk)
       data_vol = _clamp(ctx.data_volume)


       quarantine_penalty = 1.0 if (principal.quarantined or gadget.quarantined) else 0.0
       owner_mismatch_penalty = 1.0 if (gadget.proprietor != ctx.consumer) else 0.0
       exfil_penalty = 1.0 if (ctx.motion == "exfiltrate") else 0.0


       z = 0.0
       z += self.w["role_fit"] * (role_fit - 0.5)
       z += self.w["device_posture"] * (posture - 0.5)
       z += self.w["mfa"] * (mfa - 0.5)
       z += self.w["path_validity"] * (path_valid - 0.5)


       z -= self.w["asset_sensitivity"] * (sens - 0.35)


       z -= self.w["network_context"] * (net_r - 0.25)
       z -= self.w["time"] * (t_r - 0.15)
       z -= self.w["geo_risk"] * (geo - 0.2)


       z -= self.w["behavior_anomaly"] * (anomaly - 0.1)
       z -= self.w["data_volume"] * (data_vol - 0.15)


       z -= self.w["principal_base_risk"] * (principal_risk - 0.2)
       z -= self.w["principal_compromise"] * (compromise - 0.0)


       z -= 2.0 * owner_mismatch_penalty
       z -= 2.5 * exfil_penalty
       z -= self.w["quarantine"] * quarantine_penalty


       belief = _sigmoid(z)


       if belief < self.thresholds["rate_limit"]:
           controls["rate_limit"] = True
       if belief < self.thresholds["step_up"]:
           controls["step_up"] = bool(controls.get("step_up_mfa", False) or dst_zone in ["admin", "data"])
       if belief < self.thresholds["allow"]:
           controls["continuous_auth"] = True


       if "abac_denied" in rule_hits or "invalid_path_or_blocked" in rule_hits or "exfil_attempt" in rule_hits:
           controls["risk_signal"] = "policy_violation"


       if anomaly > 0.75 and sens > 0.75:
           controls["auto_response"] = "quarantine_candidate"


       return _clamp(belief), rule_hits, controls


   def consider(self, ctx: RequestContext) -> Decision:
       belief, rule_hits, controls = self._compute_trust_score(ctx)
       allowed = belief >= self.thresholds["allow"]


       if controls.get("step_up"):
           if not ctx.mfa:
               allowed = False
               rule_hits.append("step_up_failed_no_mfa")
           else:
               allowed = allowed or (belief >= self.thresholds["step_up"])


       if controls.get("rate_limit") and belief < 0.5:
           allowed = False
           rule_hits.append("rate_limited_denied")


       clarification = self._explain(ctx, belief, allowed, rule_hits, controls)
       dec = Decision(allowed=allowed, trust_score=belief, rule_hits=rule_hits, controls=controls, clarification=clarification)


       self._post_decision_updates(ctx, dec)


       self.flow_log.append(
           FlowRecord(
               ts=dec.ts,
               ctx=ctx.__dict__.copy(),
               resolution={
                   "allowed": dec.allowed,
                   "trust_score": dec.trust_score,
                   "rule_hits": dec.rule_hits,
                   "controls": dec.controls,
                   "clarification": dec.clarification
               }
           )
       )
       return dec


   def _explain(self, ctx: RequestContext, belief: float, allowed: bool, hits: List[str], controls: Dict[str, Any]) -> str:
       src_z, _ = self._asset_zone_and_sensitivity(ctx.src_node)
       dst_z, dst_s = self._asset_zone_and_sensitivity(ctx.dst_node)
       bits = []
       bits.append(f"Decision={'ALLOW' if allowed else 'DENY'} | belief={belief:.3f} | {ctx.consumer}({ctx.function}) {ctx.motion} {ctx.src_node}->{ctx.dst_node}")
       bits.append(f"Context: supply={ctx.supply}, time={ctx.time_bucket}, geo_risk={ctx.geo_risk:.2f}, anomaly={ctx.behavior_anomaly:.2f}, data_vol={ctx.data_volume:.2f}")
       bits.append(f"Zones: {src_z} -> {dst_z} (dst_sensitivity={dst_s:.2f}) | MFA={'sure' if ctx.mfa else 'no'} | posture={ctx.device_posture:.2f}")
       if hits:
           bits.append(f"Rule hits: {', '.be part of(hits)}")
       if controls:
           bits.append(f"Controls: {controls}")
       return " | ".be part of(bits)


   def _post_decision_updates(self, ctx: RequestContext, dec: Decision) -> None:
       p = self.principals.get(ctx.consumer)
       d = self.gadgets.get(ctx.device_id)


       if p is None:
           self.register_principal(ctx.consumer, ctx.function, base_risk=0.65)
           p = self.principals[ctx.user]
       if d is None:
           self.register_device(ctx.device_id, ctx.device_type, ctx.consumer, ctx.device_posture, attested=(ctx.device_type.startswith("managed")))
           d = self.gadgets[ctx.device_id]


       p.last_seen_ts = dec.ts


       if dec.allowed:
           p.rolling_allows += 1
           p.rolling_denies = max(0, p.rolling_denies - 1)
           p.compromise_score = _clamp(p.compromise_score - 0.02)
       else:
           p.rolling_denies += 1
           p.compromise_score = _clamp(p.compromise_score + 0.06 + 0.10 * (1.0 if "exfil_attempt" in dec.rule_hits else 0.0))


       if dec.controls.get("auto_response") == "quarantine_candidate" or p.rolling_denies >= 4 or p.compromise_score > 0.78:
           p.quarantined = True
           if d:
               d.quarantined = True


       if ("invalid_path_or_blocked" in dec.rule_hits) or ("exfil_attempt" in dec.rule_hits) or ("abac_denied" in dec.rule_hits):
           self.blocked_edges.add((ctx.src_node, ctx.dst_node))


   def stats(self) -> Dict[str, Any]:
       whole = len(self.flow_log)
       permits = sum(1 for r in self.flow_log if r.resolution["allowed"])
       denies = whole - permits
       top_denies = {}
       for r in self.flow_log:
           if not r.resolution["allowed"]:
               for h in r.resolution["rule_hits"]:
                   top_denies[h] = top_denies.get(h, 0) + 1
       principals = {
           u: {
               "function": p.function,
               "base_risk": spherical(p.base_risk, 3),
               "compromise_score": spherical(p.compromise_score, 3),
               "rolling_denies": p.rolling_denies,
               "rolling_allows": p.rolling_allows,
               "quarantined": p.quarantined
           }
           for u, p in self.principals.objects()
       }
       gadgets = {
           did: {
               "proprietor": d.proprietor,
               "sort": d.device_type,
               "posture": spherical(d.posture, 3),
               "attested": d.attested,
               "quarantined": d.quarantined
           }
           for did, d in self.gadgets.objects()
       }
       return {
           "policy_version": self.policy_version,
           "flows_total": whole,
           "flows_allow": permits,
           "flows_deny": denies,
           "deny_reasons_top": dict(sorted(top_denies.objects(), key=lambda kv: kv[1], reverse=True)[:10]),
           "blocked_edges_count": len(self.blocked_edges),
           "principals": principals,
           "gadgets": gadgets
       }

We implement the dynamic Zero-Trust Policy Engine that evaluates each request utilizing ABAC, contextual danger alerts, behavioral anomaly scores, and path validation. We compute a steady belief rating by way of a weighted danger mannequin and set off adaptive controls similar to step-up authentication, charge limiting, and quarantine. We replace the principal and gadget state after every resolution to simulate steady verification and evolving danger posture.

def make_world(engine: ZeroTrustPolicyEngine, seed: int = 13) -> Dict[str, Any]:
   random.seed(seed)


   customers = [
       ("alice", "employee", 0.18),
       ("bob", "engineer", 0.22),
       ("cathy", "analyst", 0.25),
       ("dan", "admin", 0.15),
       ("eve", "secops", 0.10),
       ("mallory", "employee", 0.55)
   ]
   for u, r, br in customers:
       engine.register_principal(u, r, br)


   gadgets = [
       ("dev-alice-lt", "managed_laptop", "alice", 0.82, True),
       ("dev-bob-lt", "managed_laptop", "bob", 0.77, True),
       ("dev-cathy-lt", "managed_laptop", "cathy", 0.74, True),
       ("dev-dan-lt", "managed_laptop", "dan", 0.88, True),
       ("dev-eve-lt", "managed_laptop", "eve", 0.90, True),
       ("dev-mallory-byod", "byod_phone", "mallory", 0.42, False),
       ("unknown-iot-7", "unknown_iot", "unknown", 0.20, False),
   ]
   for did, dt, proprietor, posture, attested in gadgets:
       engine.register_device(did, dt, proprietor, posture, attested)


   all_assets = [n for n in engine.G.nodes() if engine.G.nodes[n].get("variety") == "asset"]
   by_zone = {z: [a for a in all_assets if engine.G.nodes[a].get("zone") == z] for z in ZONES}


   return {"customers": customers, "gadgets": gadgets, "belongings": all_assets, "by_zone": by_zone}


def gen_request(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], variety: str = "regular", seed_salt: str = "") -> RequestContext:
   rnd = random.Random(_stable_hash(variety + seed_salt + str(_now_ts())[:6]))


   customers = world["users"]
   by_zone = world["by_zone"]


   def pick_user(role_bias: Optional[str] = None) -> Tuple[str, str]:
       if role_bias:
           filtered = [u for u in users if u[1] == role_bias]
           if filtered:
               u, r, _ = rnd.alternative(filtered)
               return u, r
       u, r, _ = rnd.alternative(customers)
       return u, r


   def user_device(u: str) -> Tuple[str, str, float]:
       candidates = [d for d in engine.devices.values() if d.owner == u]
       if candidates:
           d = rnd.alternative(candidates)
       else:
           d = rnd.alternative(checklist(engine.gadgets.values()))
       return d.device_id, d.device_type, d.posture


   def time_bucket():
       return "business_hours" if rnd.random() < 0.75 else "after_hours"


   supply = _rand_choice_weighted(NETWORK_CONTEXT, [0.45, 0.25, 0.22, 0.08])
   geo_risk = _clamp(rnd.uniform(0.05, 0.35) + (0.25 if supply in ["public_wifi", "tor_exit"] else 0.0))
   behavior_anomaly = _clamp(rnd.uniform(0.02, 0.25))
   data_volume = _clamp(rnd.uniform(0.02, 0.25))


   if variety == "regular":
       u, r = pick_user()
       did, dt, posture = user_device(u)


       src_zone = _rand_choice_weighted(["public", "dmz", "app"], [0.15, 0.55, 0.30])
       dst_zone = _rand_choice_weighted(["dmz", "app", "data"], [0.35, 0.45, 0.20])
       motion = _rand_choice_weighted(ACTIONS, [0.55, 0.28, 0.07, 0.08, 0.02])


       src = rnd.alternative(by_zone[src_zone])
       dst = rnd.alternative(by_zone[dst_zone])


       mfa = True if dst_zone in ["data", "admin"] else (rnd.random() < 0.55)


       return RequestContext(
           consumer=u, function=r,
           device_id=did, device_type=dt, device_posture=posture,
           mfa=mfa, supply=supply,
           src_node=src, dst_node=dst,
           motion=motion,
           time_bucket=time_bucket(),
           geo_risk=geo_risk,
           behavior_anomaly=behavior_anomaly,
           data_volume=data_volume,
           motive="routine_access"
       )


   if variety == "malicious_flow":
       u, r = ("unknown_actor", "buyer")
       did, dt, posture = ("unknown-dev", "unknown_iot", 0.18)


       supply = _rand_choice_weighted(["tor_exit", "public_wifi"], [0.65, 0.35])
       geo_risk = _clamp(rnd.uniform(0.6, 0.95))
       behavior_anomaly = _clamp(rnd.uniform(0.75, 0.98))
       data_volume = _clamp(rnd.uniform(0.75, 0.98))


       src = rnd.alternative(by_zone["public"] + by_zone["dmz"])
       dst = rnd.alternative(by_zone["data"] + by_zone["admin"])
       motion = _rand_choice_weighted(["write", "admin", "exfiltrate"], [0.25, 0.25, 0.50])
       mfa = False


       return RequestContext(
           consumer=u, function=r,
           device_id=did, device_type=dt, device_posture=posture,
           mfa=mfa, supply=supply,
           src_node=src, dst_node=dst,
           motion=motion,
           time_bucket="after_hours",
           geo_risk=geo_risk,
           behavior_anomaly=behavior_anomaly,
           data_volume=data_volume,
           motive="external_malicious_attempt"
       )


   if variety == "insider_threat":
       u, r = ("mallory", "worker")
       did, dt, posture = user_device(u)


       supply = _rand_choice_weighted(["corp_vpn", "public_wifi"], [0.55, 0.45])
       geo_risk = _clamp(rnd.uniform(0.25, 0.65))
       behavior_anomaly = _clamp(rnd.uniform(0.55, 0.95))
       data_volume = _clamp(rnd.uniform(0.55, 0.95))


       src = rnd.alternative(by_zone["app"] + by_zone["dmz"])
       dst = rnd.alternative(by_zone["data"] + by_zone["admin"])
       motion = _rand_choice_weighted(["read", "write", "exfiltrate", "admin"], [0.18, 0.22, 0.45, 0.15])


       mfa = rnd.random() < 0.25


       return RequestContext(
           consumer=u, function=r,
           device_id=did, device_type=dt, device_posture=posture,
           mfa=mfa, supply=supply,
           src_node=src, dst_node=dst,
           motion=motion,
           time_bucket="after_hours",
           geo_risk=geo_risk,
           behavior_anomaly=behavior_anomaly,
           data_volume=data_volume,
           motive="insider_lateral_and_exfil"
       )


   elevate ValueError(f"Unknown variety={variety}")




def run_simulation(engine: ZeroTrustPolicyEngine, world: Dict[str, Any], steps: int = 60, seed: int = 99) -> Dict[str, Any]:
   random.seed(seed)
   outcomes = {"allowed": 0, "denied": 0, "samples": []}


   for i in vary(steps):
       if i in [12, 13, 14, 28, 29]:
           ctx = gen_request(engine, world, variety="malicious_flow", seed_salt=str(i))
       elif i in [18, 19, 20, 34, 35, 36, 50, 51]:
           ctx = gen_request(engine, world, variety="insider_threat", seed_salt=str(i))
       else:
           ctx = gen_request(engine, world, variety="regular", seed_salt=str(i))


       dec = engine.consider(ctx)
       if dec.allowed:
           outcomes["allowed"] += 1
       else:
           outcomes["denied"] += 1


       if i < 10 or (not dec.allowed and len(outcomes["samples"]) < 18):
           outcomes["samples"].append({"ctx": ctx.__dict__, "resolution": dec.__dict__})


   return outcomes

We generate lifelike site visitors situations, together with regular enterprise exercise, malicious exterior flows, and insider lateral-movement makes an attempt. We simulate contextual variables, together with geo-risk, anomaly scores, and knowledge quantity, to stress-test the coverage engine. We run multi-step simulations to observe how belief scores shift and how the engine progressively blocks dangerous habits.

def make_app(engine: ZeroTrustPolicyEngine, world: Dict[str, Any]) -> Flask:
   app = Flask(__name__)


   @app.get("/well being")
   def well being():
       return jsonify({"okay": True, "policy_version": engine.policy_version})


   @app.get("/graph")
   def graph():
       nodes = [{"id": n, **engine.G.nodes[n]} for n in engine.G.nodes()]
       edges = [{"src": u, "dst": v, **engine.G.edges[u, v]} for u, v in engine.G.edges()]
       return jsonify({"nodes": nodes, "edges": edges, "blocked_edges": checklist(map(checklist, engine.blocked_edges))})


   @app.submit("/request")
   def evaluate_request():
       payload = request.get_json(pressure=True)
       ctx = RequestContext(**payload)
       dec = engine.consider(ctx)
       return jsonify({"allowed": dec.allowed, "trust_score": dec.trust_score, "rule_hits": dec.rule_hits, "controls": dec.controls, "clarification": dec.clarification})


   @app.submit("/simulate")
   def simulate():
       payload = request.get_json(pressure=True) if request.knowledge else {}
       steps = int(payload.get("steps", 50))
       res = run_simulation(engine, world, steps=steps, seed=int(payload.get("seed", 123)))
       return jsonify({"steps": steps, "allowed": res["allowed"], "denied": res["denied"], "stats": engine.stats()})


   @app.get("/stats")
   def stats():
       return jsonify(engine.stats())


   return app




G = build_microsegmented_graph(seed=7)
engine = ZeroTrustPolicyEngine(G)
world = make_world(engine, seed=13)


draw_graph(G, title="Zero-Trust Microsegmented Network (Zones + Assets + Directed Flows)")


app = make_app(engine, world)
shopper = app.test_client()


print("== Health ==")
print(shopper.get("/well being").json)


print("n== Run simulation (combination: regular + malicious flows + insider menace) ==")
sim_out = shopper.submit("/simulate", json={"steps": 70, "seed": 2026}).json
print(_pretty({"allowed": sim_out["allowed"], "denied": sim_out["denied"], "blocked_edges_count": sim_out["stats"]["blocked_edges_count"]}))


print("n== Top deny causes ==")
print(_pretty(sim_out["stats"]["deny_reasons_top"]))


print("n== Principal danger snapshot (watch mallory) ==")
principals = sim_out["stats"]["principals"]
focus = {ok: principals[k] for ok in sorted(principals.keys()) if ok in ["alice","bob","cathy","dan","eve","mallory","unknown_actor"]}
print(_pretty(focus))


print("n== Example: ship a direct insider exfil request through the coverage API ==")
insider_ctx = gen_request(engine, world, variety="insider_threat", seed_salt="manual-1")
insider_ctx.motion = "exfiltrate"
insider_ctx.mfa = False
insider_ctx.behavior_anomaly = 0.92
insider_ctx.data_volume = 0.88
insider_ctx.geo_risk = 0.62


resp = shopper.submit("/request", json=insider_ctx.__dict__).json
print(_pretty(resp))


print("n== Example: a respectable admin learn with MFA from corp_lan ==")
admin_ctx = RequestContext(
   consumer="dan", function="admin",
   device_id="dev-dan-lt", device_type="managed_laptop", device_posture=engine.gadgets["dev-dan-lt"].posture,
   mfa=True, supply="corp_lan",
   src_node=random.alternative(world["by_zone"]["admin"]),
   dst_node=random.alternative(world["by_zone"]["data"]),
   motion="learn",
   time_bucket="business_hours",
   geo_risk=0.08,
   behavior_anomaly=0.06,
   data_volume=0.10,
   motive="admin_operational_access"
)
resp2 = shopper.submit("/request", json=admin_ctx.__dict__).json
print(_pretty(resp2))


print("n== Final stats ==")
final_stats = shopper.get("/stats").json
print(_pretty({
   "flows_total": final_stats["flows_total"],
   "flows_allow": final_stats["flows_allow"],
   "flows_deny": final_stats["flows_deny"],
   "blocked_edges_count": final_stats["blocked_edges_count"],
   "deny_reasons_top": final_stats["deny_reasons_top"]
}))


scores = [r.decision["trust_score"] for r in engine.flow_log]
plt.determine(figsize=(9, 4))
plt.hist(scores, bins=18)
plt.title("Trust Score Distribution Across Simulated Flows")
plt.xlabel("trust_score")
plt.ylabel("depend")
plt.present()


denied = [r for r in engine.flow_log if not r.decision["allowed"]]
print("n== Recent denied explanations (final 6) ==")
for r in denied[-6:]:
   print("-", r.resolution["explanation"])

We expose the coverage engine by way of a Flask API and work together with it utilizing a take a look at shopper to hold the pocket book self-contained. We run simulations, examine belief distributions, analyze denial causes, and observe quarantine and edge-blocking habits. We conclude by visualizing belief rating patterns and analyzing denied explanations to validate the Zero-Trust enforcement logic in motion.

In conclusion, we demonstrated how Zero Trust turns into a measurable, programmable system when identification, gadget state, community context, and habits alerts are evaluated collectively for each interplay. We noticed the coverage engine deny or step up dangerous requests, rate-limit low-trust exercise, and dynamically block abusive edges to stop repeated lateral motion and knowledge theft. By combining graph-based segmentation with an evolving belief rating and automated responses, we ended with a repeatable framework that we will lengthen with richer telemetry, higher anomaly fashions, and environment-specific insurance policies whereas retaining the core “by no means belief, at all times confirm” loop intact.


Check out the Full Codes with NotebookAlso, be at liberty to comply with us on Twitter and don’t overlook to be part of our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to accomplice with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so on.? Connect with us

The submit How to Build a Dynamic Zero-Trust Network Simulation with Graph-Based Micro-Segmentation, Adaptive Policy Engine, and Insider Threat Detection appeared first on MarkTechPost.

Similar Posts