|

An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls

In this tutorial, we construct a ruled AI-agent workflow utilizing Microsoft’s Agent Governance Toolkit as the reference level. We create a Colab-ready implementation the place brokers don’t straight execute instruments; as an alternative, each motion first passes by way of a governance layer that checks the agent’s identification, belief rating, danger tier, requested device, motion sort, sensitivity degree, and coverage guidelines. We outline a YAML-based coverage that controls damaging database operations, exterior e mail sending, shell execution, entry to delicate knowledge, and monetary transfers. We then wrap every device with governance logic in order that actions will be allowed, denied, sandboxed, or routed by way of an approval step earlier than execution. We additionally generate tamper-evident audit information, run coverage exams, activate a kill swap, summarize governance selections, and visualize the relationships between brokers, instruments, guidelines, and outcomes as a graph.

import os
import sys
import json
import time
import uuid
import hmac
import yaml
import hashlib
import random
import shutil
import subprocess
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Callable, Optional
def pip_install(*packages):
   subprocess.run(
       [sys.executable, "-m", "pip", "install", "-q", *packages],
       examine=False
   )
pip_install("pyyaml", "pandas", "networkx", "matplotlib", "wealthy")
pip_install("agent-governance-toolkit[full]")
from wealthy.console import Console
from wealthy.desk import Table
from wealthy.panel import Panel
from wealthy import field
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
console = Console()
REPO_URL = "https://github.com/microsoft/agent-governance-toolkit"
REPO_DIR = "/content material/agent-governance-toolkit"
if not os.path.exists(REPO_DIR):
   subprocess.run(["git", "clone", "--depth", "1", REPO_URL, REPO_DIR], examine=False)
official_govern = None
official_import_error = None
attempt:
   from agentmesh.governance import govern as official_govern
besides Exception as e:
   official_import_error = repr(e)

We arrange the Colab setting by putting in the required libraries and importing every thing wanted for coverage dealing with, auditing, visualization, and knowledge evaluation. We additionally clone the Microsoft Agent Governance Toolkit repository to maintain the pocket book related to the authentic undertaking. We then attempt to import the official governance operate, whereas conserving the tutorial runnable even when the preview package deal adjustments.

POLICY_PATH = "/content material/advanced_agent_policy.yaml"
policy_yaml = """
apiVersion: governance.toolkit/v1
identify: advanced-colab-governance-policy
default_action: enable
metadata:
 proprietor: ai-platform-team
 setting: tutorial
 description: >
   Demonstrates deterministic governance controls for AI agent device calls.
guidelines:
 - identify: block-destructive-database-actions
   description: "Agents should not carry out damaging database operations."
   situation: "motion.sort in ['drop_table', 'delete_table', 'truncate_table']"
   motion: deny
   severity: crucial
   owasp_risk: "Tool misuse / Excessive company"
 - identify: require-human-approval-for-email
   description: "External e mail requires approval earlier than execution."
   situation: "motion.sort == 'send_email' and motion.recipient_domain != 'inner.native'"
   motion: require_approval
   approvers: ["security-team", "business-owner"]
   severity: excessive
   owasp_risk: "Goal hijacking / Unauthorized motion"
 - identify: sandbox-shell-execution
   description: "Shell instructions should run in a sandbox with blocked harmful instructions."
   situation: "motion.sort == 'shell_exec'"
   motion: sandbox
   sandbox:
     blocked_terms: ["rm -rf", "curl http", "wget http", "chmod 777", "sudo"]
     max_runtime_seconds: 2
   severity: excessive
   owasp_risk: "Tool misuse / Unsafe execution"
 - identify: deny-low-trust-agent-sensitive-data
   description: "Low-trust brokers can not entry delicate knowledge."
   situation: "identification.trust_score < 0.65 and motion.sensitivity in ['high', 'critical']"
   motion: deny
   severity: excessive
   owasp_risk: "Identity abuse / Data exfiltration"
 - identify: require-approval-for-financial-transaction
   description: "Financial transactions above 1000 require approval."
   situation: "motion.sort == 'transfer_money' and motion.quantity > 1000"
   motion: require_approval
   approvers: ["finance-controller"]
   severity: crucial
   owasp_risk: "Excessive company / Business course of compromise"
 - identify: rate-limit-high-risk-agent
   description: "High-risk brokers are blocked from repeated autonomous actions."
   situation: "identification.risk_tier == 'excessive' and motion.autonomous == True"
   motion: deny
   severity: medium
   owasp_risk: "Rogue agent / Cascading failure"
"""
with open(POLICY_PATH, "w") as f:
   f.write(policy_yaml)
with open(POLICY_PATH, "r") as f:
   coverage = yaml.safe_load(f)

We create a YAML governance coverage that defines how agent actions must be dealt with earlier than execution. We add guidelines to dam damaging database actions, require approval for exterior emails and monetary transfers, sandbox shell instructions, and prohibit low-trust brokers from delicate knowledge. We then save and reload this coverage so the relaxation of the tutorial can use it as the most important governance configuration.

@dataclass
class AgentIdentity:
   agent_id: str
   identify: str
   function: str
   proprietor: str
   trust_score: float
   risk_tier: str
   scopes: List[str]
@dataclass
class GovernanceDecision:
   decision_id: str
   timestamp: str
   policy_name: str
   agent_id: str
   agent_name: str
   tool_name: str
   motion: Dict[str, Any]
   resolution: str
   matched_rule: Optional[str]
   severity: Optional[str]
   cause: str
   approved_by: Optional[str]
   previous_hash: str
   record_hash: str
class GovernanceDenied(Exception):
   go
class ApprovalRequired(Exception):
   go
class SandboxViolation(Exception):
   go
class DotDict(dict):
   def __getattr__(self, merchandise):
       worth = self.get(merchandise)
       if isinstance(worth, dict):
           return DotDict(worth)
       return worth
def safe_eval_condition(situation: str, motion: Dict[str, Any], identification: AgentIdentity) -> bool:
   safe_globals = {
       "__builtins__": {},
       "True": True,
       "False": False,
       "None": None,
   }
   safe_locals = {
       "motion": DotDict(motion),
       "identification": DotDict(asdict(identification)),
   }
   attempt:
       return bool(eval(situation, safe_globals, safe_locals))
   besides Exception as e:
       return False

We outline the core knowledge constructions for representing agent identities, governance selections, and governance-related exceptions. We additionally create a small dot-access dictionary helper in order that coverage situations can learn values similar to motion.sort and identification.trust_score. We then construct a secure situation evaluator that checks whether or not every coverage rule matches the present agent motion.

class TamperEvidentAuditLog:
   def __init__(self, secret: bytes = b"tutorial-secret-key"):
       self.information: List[GovernanceDecision] = []
       self.secret = secret
       self.last_hash = "GENESIS"
   def _hash_record(self, payload: Dict[str, Any], previous_hash: str) -> str:
       canonical = json.dumps(
           {"payload": payload, "previous_hash": previous_hash},
           sort_keys=True,
           default=str
       ).encode()
       return hmac.new(self.secret, canonical, hashlib.sha256).hexdigest()
   def append(
       self,
       policy_name: str,
       identification: AgentIdentity,
       tool_name: str,
       motion: Dict[str, Any],
       resolution: str,
       matched_rule: Optional[str],
       severity: Optional[str],
       cause: str,
       approved_by: Optional[str] = None
   ) -> GovernanceDecision:
       base_payload = {
           "decision_id": str(uuid.uuid4()),
           "timestamp": datetime.now(timezone.utc).isoformat(),
           "policy_name": policy_name,
           "agent_id": identification.agent_id,
           "agent_name": identification.identify,
           "tool_name": tool_name,
           "motion": motion,
           "resolution": resolution,
           "matched_rule": matched_rule,
           "severity": severity,
           "cause": cause,
           "approved_by": approved_by,
       }
       record_hash = self._hash_record(base_payload, self.last_hash)
       document = GovernanceDecision(
           **base_payload,
           previous_hash=self.last_hash,
           record_hash=record_hash
       )
       self.information.append(document)
       self.last_hash = record_hash
       return document
   def confirm(self) -> bool:
       earlier = "GENESIS"
       for r in self.information:
           payload = asdict(r)
           record_hash = payload.pop("record_hash")
           previous_hash = payload.pop("previous_hash")
           if previous_hash != earlier:
               return False
           anticipated = self._hash_record(payload, previous_hash)
           if anticipated != record_hash:
               return False
           earlier = record_hash
       return True
   def to_dataframe(self) -> pd.DataBody:
       return pd.DataBody([asdict(r) for r in self.records])
audit_log = TamperEvidentAuditLog()

We implement a tamper-evident audit log that information each governance resolution made by the system. We use chained hashes, so every new document depends upon the earlier document, making adjustments simpler to detect. We additionally add strategies to confirm the audit chain and convert the information right into a dataframe for later evaluation.

class TutorialGovernanceEngine:
   def __init__(self, coverage: Dict[str, Any], audit_log: TamperEvidentAuditLog):
       self.coverage = coverage
       self.audit_log = audit_log
       self.kill_switch_enabled = False
       self.error_budget = 5
       self.recent_denials = 0
   def activate_kill_switch(self):
       self.kill_switch_enabled = True
   def deactivate_kill_switch(self):
       self.kill_switch_enabled = False
   def consider(
       self,
       identification: AgentIdentity,
       tool_name: str,
       motion: Dict[str, Any]
   ) -> GovernanceDecision:
       if self.kill_switch_enabled:
           return self.audit_log.append(
               policy_name=self.coverage["name"],
               identification=identification,
               tool_name=tool_name,
               motion=motion,
               resolution="deny",
               matched_rule="global-kill-switch",
               severity="crucial",
               cause="Global governance kill swap is energetic."
           )
       for rule in self.coverage.get("guidelines", []):
           situation = rule.get("situation", "")
           if safe_eval_condition(situation, motion, identification):
               rule_action = rule.get("motion", "deny")
               matched_rule = rule.get("identify")
               severity = rule.get("severity")
               description = rule.get("description", "Policy rule matched.")
               if rule_action == "deny":
                   self.recent_denials += 1
                   return self.audit_log.append(
                       policy_name=self.coverage["name"],
                       identification=identification,
                       tool_name=tool_name,
                       motion=motion,
                       resolution="deny",
                       matched_rule=matched_rule,
                       severity=severity,
                       cause=description
                   )
               if rule_action == "require_approval":
                   return self.audit_log.append(
                       policy_name=self.coverage["name"],
                       identification=identification,
                       tool_name=tool_name,
                       motion=motion,
                       resolution="require_approval",
                       matched_rule=matched_rule,
                       severity=severity,
                       cause=description
                   )
               if rule_action == "sandbox":
                   blocked_terms = rule.get("sandbox", {}).get("blocked_terms", [])
                   command = str(motion.get("command", ""))
                   for time period in blocked_terms:
                       if time period in command:
                           self.recent_denials += 1
                           return self.audit_log.append(
                               policy_name=self.coverage["name"],
                               identification=identification,
                               tool_name=tool_name,
                               motion=motion,
                               resolution="deny",
                               matched_rule=matched_rule,
                               severity=severity,
                               cause=f"Sandbox blocked command time period: {time period}"
                           )
                   return self.audit_log.append(
                       policy_name=self.coverage["name"],
                       identification=identification,
                       tool_name=tool_name,
                       motion=motion,
                       resolution="sandbox",
                       matched_rule=matched_rule,
                       severity=severity,
                       cause=description
                   )
       return self.audit_log.append(
           policy_name=self.coverage["name"],
           identification=identification,
           tool_name=tool_name,
           motion=motion,
           resolution=self.coverage.get("default_action", "enable"),
           matched_rule=None,
           severity=None,
           cause="No coverage rule matched. Default motion utilized."
       )
engine = TutorialGovernanceEngine(coverage, audit_log)

We construct the most important governance engine that compares every agent motion towards the YAML coverage guidelines. We deal with completely different outcomes similar to deny, approval required, sandbox mode, and default enable. We additionally embrace a kill swap that instantly blocks all actions when wanted.

def query_database(desk: str, operation: str = "choose") -> Dict[str, Any]:
   return {
       "standing": "success",
       "operation": operation,
       "desk": desk,
       "rows_returned": random.randint(10, 100)
   }
def send_email(to: str, topic: str, physique: str) -> Dict[str, Any]:
   return {
       "standing": "despatched",
       "to": to,
       "topic": topic,
       "body_preview": physique[:80]
   }
def shell_exec(command: str) -> Dict[str, Any]:
   allowed_commands = ["echo", "date", "pwd", "ls"]
   first = command.strip().cut up()[0] if command.strip() else ""
   if first not in allowed_commands:
       return {
           "standing": "blocked_by_tutorial_shell",
           "command": command,
           "cause": "Only innocent demo shell instructions are executed."
       }
   end result = subprocess.run(
       command,
       shell=True,
       capture_output=True,
       textual content=True,
       timeout=2
   )
   return {
       "standing": "executed",
       "command": command,
       "stdout": end result.stdout.strip(),
       "stderr": end result.stderr.strip()
   }
def transfer_money(quantity: float, vacation spot: str) -> Dict[str, Any]:
   return {
       "standing": "transferred",
       "quantity": quantity,
       "vacation spot": vacation spot
   }
class RuledTool:
   def __init__(
       self,
       identify: str,
       fn: Callable,
       engine: TutorialGovernanceEngine,
       identification: AgentIdentity,
       approval_simulator: Optional[Callable[[GovernanceDecision], bool]] = None
   ):
       self.identify = identify
       self.fn = fn
       self.engine = engine
       self.identification = identification
       self.approval_simulator = approval_simulator or (lambda resolution: False)
   def __call__(self, **kwargs):
       motion = dict(kwargs)
       motion.setdefault("autonomous", True)
       resolution = self.engine.consider(
           identification=self.identification,
           tool_name=self.identify,
           motion=motion
       )
       if resolution.resolution == "deny":
           elevate GovernanceDenied(
               f"Action denied by rule '{resolution.matched_rule}': {resolution.cause}"
           )
       if resolution.resolution == "require_approval":
           permitted = self.approval_simulator(resolution)
           if not permitted:
               elevate ApprovalRequired(
                   f"Approval required by rule '{resolution.matched_rule}': {resolution.cause}"
               )
           self.engine.audit_log.append(
               policy_name=self.engine.coverage["name"],
               identification=self.identification,
               tool_name=self.identify,
               motion=motion,
               resolution="permitted",
               matched_rule=resolution.matched_rule,
               severity=resolution.severity,
               cause="Human approval simulated for tutorial.",
               approved_by="tutorial-approver"
           )
       return self.fn(**kwargs)

We outline pattern instruments that symbolize actual agent capabilities, together with database entry, e mail sending, shell execution, and cash switch. We then create a ruled device wrapper that ensures each device name passes by way of the governance engine first. We guarantee denied actions cease instantly, that approval-based actions require a simulated approval, and that solely permitted or allowed actions attain the precise device.

research_agent = AgentIdentity(
   agent_id="agent-research-001",
   identify="AnalysisAgent",
   function="market_research",
   proprietor="strategy-team",
   trust_score=0.91,
   risk_tier="low",
   scopes=["read_database", "web_search", "internal_email"]
)
ops_agent = AgentIdentity(
   agent_id="agent-ops-002",
   identify="OpsAgent",
   function="automation",
   proprietor="platform-team",
   trust_score=0.72,
   risk_tier="medium",
   scopes=["shell_exec", "read_database"]
)
unknown_agent = AgentIdentity(
   agent_id="agent-shadow-999",
   identify="ShadowAgent",
   function="unknown",
   proprietor="unknown",
   trust_score=0.42,
   risk_tier="excessive",
   scopes=["unknown"]
)
finance_agent = AgentIdentity(
   agent_id="agent-finance-003",
   identify="FinanceAgent",
   function="finance_ops",
   proprietor="finance-team",
   trust_score=0.88,
   risk_tier="low",
   scopes=["transfer_money", "read_database"]
)
def tutorial_approval_simulator(resolution: GovernanceDecision) -> bool:
   motion = resolution.motion
   if resolution.matched_rule == "require-approval-for-financial-transaction":
       return motion.get("quantity", 0) <= 5000
   if resolution.matched_rule == "require-human-approval-for-email":
       return "confidential" not in str(motion).decrease()
   return False
research_db = RuledTool(
   identify="query_database",
   fn=query_database,
   engine=engine,
   identification=research_agent,
   approval_simulator=tutorial_approval_simulator
)
ops_shell = RuledTool(
   identify="shell_exec",
   fn=shell_exec,
   engine=engine,
   identification=ops_agent,
   approval_simulator=tutorial_approval_simulator
)
shadow_db = RuledTool(
   identify="query_database",
   fn=query_database,
   engine=engine,
   identification=unknown_agent,
   approval_simulator=tutorial_approval_simulator
)
research_email = RuledTool(
   identify="send_email",
   fn=send_email,
   engine=engine,
   identification=research_agent,
   approval_simulator=tutorial_approval_simulator
)
finance_transfer = RuledTool(
   identify="transfer_money",
   fn=transfer_money,
   engine=engine,
   identification=finance_agent,
   approval_simulator=tutorial_approval_simulator
)

We create a number of brokers with completely different roles, belief scores, danger ranges, and scopes to simulate a practical multi-agent setting. We additionally outline an approval simulator that accepts or rejects actions based mostly on easy enterprise logic. We then wrap every device with the appropriate agent identification so the governance layer could make identity-aware selections.

eventualities = [
   {
       "name": "Safe database read",
       "tool": research_db,
       "kwargs": {
           "table": "customers",
           "operation": "select",
           "type": "select",
           "sensitivity": "medium"
       }
   },
   {
       "name": "Blocked destructive database action",
       "tool": research_db,
       "kwargs": {
           "table": "customers",
           "operation": "drop",
           "type": "drop_table",
           "sensitivity": "critical"
       }
   },
   {
       "name": "External email requiring approval",
       "tool": research_email,
       "kwargs": {
           "to": "[email protected]",
           "recipient_domain": "example.com",
           "subject": "Quarterly update",
           "body": "Sharing a non-confidential quarterly update.",
           "type": "send_email",
           "sensitivity": "medium"
       }
   },
   {
       "name": "External email denied due to approval rejection",
       "tool": research_email,
       "kwargs": {
           "to": "[email protected]",
           "recipient_domain": "example.com",
           "subject": "Confidential strategy",
           "body": "This contains confidential strategy.",
           "type": "send_email",
           "sensitivity": "critical"
       }
   },
   {
       "name": "Safe sandbox shell command",
       "tool": ops_shell,
       "kwargs": {
           "command": "echo Agent governance is active",
           "type": "shell_exec",
           "sensitivity": "low"
       }
   },
   {
       "name": "Dangerous shell command blocked",
       "tool": ops_shell,
       "kwargs": {
           "command": "rm -rf /content/something",
           "type": "shell_exec",
           "sensitivity": "critical"
       }
   },
   {
       "name": "Low-trust agent blocked from sensitive data",
       "tool": shadow_db,
       "kwargs": {
           "table": "executive_compensation",
           "operation": "select",
           "type": "select",
           "sensitivity": "critical"
       }
   },
   {
       "name": "Financial transfer requiring approval",
       "tool": finance_transfer,
       "kwargs": {
           "amount": 2500,
           "destination": "vendor-123",
           "type": "transfer_money",
           "sensitivity": "high"
       }
   },
   {
       "name": "Large financial transfer rejected",
       "tool": finance_transfer,
       "kwargs": {
           "amount": 15000,
           "destination": "vendor-999",
           "type": "transfer_money",
           "sensitivity": "critical"
       }
   },
]
outcomes = []
for situation in eventualities:
   attempt:
       output = situation["tool"](**situation["kwargs"])
       outcomes.append({
           "situation": situation["name"],
           "standing": "executed",
           "output": output
       })
   besides Exception as e:
       outcomes.append({
           "situation": situation["name"],
           "standing": "blocked_or_pending",
           "error": str(e)
       })
audit_df = audit_log.to_dataframe()
display_cols = [
   "timestamp",
   "agent_name",
   "tool_name",
   "decision",
   "matched_rule",
   "severity",
   "reason",
   "record_hash"
]
show(audit_df[display_cols])
test_cases = [
   {
       "name": "drop_table must be denied",
       "identity": research_agent,
       "tool_name": "query_database",
       "action": {"type": "drop_table", "sensitivity": "critical", "autonomous": True},
       "expected": "deny"
   },
   {
       "name": "safe select should be allowed",
       "identity": research_agent,
       "tool_name": "query_database",
       "action": {"type": "select", "sensitivity": "low", "autonomous": True},
       "expected": "allow"
   },
   {
       "name": "external email should require approval",
       "identity": research_agent,
       "tool_name": "send_email",
       "action": {
           "type": "send_email",
           "recipient_domain": "example.com",
           "sensitivity": "medium",
           "autonomous": True
       },
       "expected": "require_approval"
   },
   {
       "name": "low trust sensitive access denied",
       "identity": unknown_agent,
       "tool_name": "query_database",
       "action": {"type": "select", "sensitivity": "critical", "autonomous": True},
       "expected": "deny"
   },
   {
       "name": "shell command should enter sandbox",
       "identity": ops_agent,
       "tool_name": "shell_exec",
       "action": {
           "type": "shell_exec",
           "command": "echo hello",
           "sensitivity": "low",
           "autonomous": True
       },
       "expected": "sandbox"
   },
]
test_results = []
for check in test_cases:
   resolution = engine.consider(
       identification=check["identity"],
       tool_name=check["tool_name"],
       motion=check["action"]
   )
   handed = resolution.resolution == check["expected"]
   test_results.append({
       "check": check["name"],
       "anticipated": check["expected"],
       "precise": resolution.resolution,
       "handed": handed,
       "matched_rule": resolution.matched_rule
   })
test_df = pd.DataBody(test_results)
show(test_df)
engine.activate_kill_switch()
attempt:
   research_db(
       desk="clients",
       operation="choose",
       sort="choose",
       sensitivity="low"
   )
besides Exception as e:
   go
engine.deactivate_kill_switch()
audit_df = audit_log.to_dataframe()
abstract = (
   audit_df
   .groupby(["decision", "severity"], dropna=False)
   .dimension()
   .reset_index(identify="depend")
   .sort_values("depend", ascending=False)
)
show(abstract)
agent_summary = (
   audit_df
   .groupby(["agent_name", "decision"])
   .dimension()
   .reset_index(identify="depend")
   .sort_values(["agent_name", "count"], ascending=[True, False])
)
show(agent_summary)
decision_counts = audit_df["decision"].value_counts()
plt.determine(figsize=(8, 5))
decision_counts.plot(type="bar")
plt.title("Governance Decisions Across Agent Actions")
plt.xlabel("Decision")
plt.ylabel("Count")
plt.xticks(rotation=30)
plt.tight_layout()
plt.present()
severity_counts = audit_df["severity"].fillna("none").value_counts()
plt.determine(figsize=(8, 5))
severity_counts.plot(type="bar")
plt.title("Governance Events by Severity")
plt.xlabel("Severity")
plt.ylabel("Count")
plt.xticks(rotation=30)
plt.tight_layout()
plt.present()
G = nx.DiGraph()
for _, row in audit_df.iterrows():
   agent_node = f"Agent: {row['agent_name']}"
   tool_node = f"Tool: {row['tool_name']}"
   decision_node = f"Decision: {row['decision']}"
   rule_node = f"Rule: {row['matched_rule']}" if pd.notna(row["matched_rule"]) else "Rule: default"
   G.add_node(agent_node, node_type="agent")
   G.add_node(tool_node, node_type="device")
   G.add_node(decision_node, node_type="resolution")
   G.add_node(rule_node, node_type="rule")
   G.add_edge(agent_node, tool_node, relation="calls")
   G.add_edge(tool_node, decision_node, relation="produces")
   G.add_edge(decision_node, rule_node, relation="matched")
plt.determine(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, ok=0.8)
nx.draw_networkx_nodes(G, pos, node_size=1800)
nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle="->", arrowsize=15)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title("Agent Governance Graph: Agents, Tools, Decisions, and Policy Rules")
plt.axis("off")
plt.tight_layout()
plt.present()
EXPORT_DIR = "/content material/agt_tutorial_outputs"
os.makedirs(EXPORT_DIR, exist_ok=True)
audit_json_path = os.path.be part of(EXPORT_DIR, "tamper_evident_audit_log.json")
audit_csv_path = os.path.be part of(EXPORT_DIR, "governance_audit_log.csv")
policy_copy_path = os.path.be part of(EXPORT_DIR, "advanced_agent_policy.yaml")
test_results_path = os.path.be part of(EXPORT_DIR, "policy_test_results.csv")
with open(audit_json_path, "w") as f:
   json.dump([asdict(r) for r in audit_log.records], f, indent=2, default=str)
audit_df.to_csv(audit_csv_path, index=False)
test_df.to_csv(test_results_path, index=False)
shutil.copy(POLICY_PATH, policy_copy_path)

We run a set of check eventualities that present how the ruled system handles secure actions, dangerous actions, approval flows, and blocked operations. We show the audit log, run coverage exams, activate and deactivate the kill swap, and summarize governance selections with tables and charts. We additionally create a governance graph and export the audit logs, coverage file, and check outcomes as reusable artifacts.

In conclusion, we’ve a completely governed-agent workflow that covers each coverage enforcement and observability. We simulated a number of brokers with various belief ranges. We confirmed how the identical system responds in a different way relying on the agent’s identification, the motion’s sensitivity, and the guidelines outlined in the coverage file. Safe actions, similar to easy database reads, are executed. In distinction, dangerous actions, similar to damaging database adjustments, unsafe shell instructions, low-trust delicate entry, and massive monetary transfers, are blocked or despatched for approval. We additionally recorded each resolution in an audit log, verified the audit chain, ran coverage exams, exported governance artifacts, and created visible summaries that make the system’s conduct simpler to assessment.


Check out the Full Codes hereAlso, be happy to comply with us on Twitter and don’t overlook to affix our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to associate with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so on.? Connect with us

The publish An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls appeared first on MarkTechPost.

Similar Posts