|

A Coding Guide to Build an AI-Powered Cryptographic Agent System with Hybrid Encryption, Digital Signatures, and Adaptive Security Intelligence

🔐

In this tutorial, we construct an AI-powered cryptographic agent system that mixes the power of classical encryption with adaptive intelligence. We design brokers able to performing hybrid encryption with RSA and AES, producing digital signatures, detecting anomalies in message patterns, and intelligently recommending key rotations. As we progress, we witness these autonomous brokers securely set up communication channels, change encrypted information, and constantly assess safety dangers in actual time, all inside a compact, environment friendly implementation. Check out the FULL CODES here.

import hashlib, hmac, json, time, secrets and techniques, numpy as np
from dataclasses import dataclass
from typing import Dict, List
from cryptography.hazmat.primitives.uneven import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend


@dataclass
class SecurityOccasion:
   timestamp: float
   event_type: str
   risk_score: float
   particulars: Dict

We start by importing all needed libraries for cryptography, AI-based evaluation, and information dealing with. We additionally outline a SecurityOccasion dataclass to file and analyze all vital occasions within the cryptographic system. Check out the FULL CODES here.

class CryptoAgent:
   def __init__(self, agent_id: str):
       self.agent_id = agent_id
       self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
       self.public_key = self.private_key.public_key()
       self.session_keys = {}
       self.security_events = []
       self.encryption_count = 0
       self.key_rotation_threshold = 100


   def get_public_key_bytes(self) -> bytes:
       return self.public_key.public_bytes(
           encoding=serialization.Encoding.PEM,
           format=serialization.PublicFormat.SubjectPublicKeyInfo
       )


   def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
       session_key = secrets and techniques.token_bytes(32)
       self.session_keys[partner_id] = session_key
       partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
       encrypted_session_key = partner_public_key.encrypt(
           session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.log_security_event("SESSION_ESTABLISHED", 0.1, {"companion": partner_id})
       return encrypted_session_key


   def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
       session_key = self.private_key.decrypt(
           encrypted_session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.session_keys[partner_id] = session_key

We outline the CryptoAgent class and initialize its keys, session storage, and safety monitoring system. We then implement strategies that allow brokers generate and change RSA public keys and set up safe hybrid-encrypted periods. Check out the FULL CODES here.

 def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
       if partner_id not in self.session_keys:
           increase ValueError(f"No session established with {partner_id}")
       self.encryption_count += 1
       if self.encryption_count >= self.key_rotation_threshold:
           self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"rely": self.encryption_count})
       iv = secrets and techniques.token_bytes(12)
       cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
       encryptor = cipher.encryptor()
       ciphertext = encryptor.replace(plaintext.encode()) + encryptor.finalize()
       message_data = iv + ciphertext + encryptor.tag
       signature = self.sign_data(message_data)
       risk_score = self.analyze_encryption_pattern(len(plaintext))
       return {
           "sender": self.agent_id,
           "recipient": partner_id,
           "iv": iv.hex(),
           "ciphertext": ciphertext.hex(),
           "tag": encryptor.tag.hex(),
           "signature": signature.hex(),
           "timestamp": time.time(),
           "risk_score": risk_score
       }


   def decrypt_message(self, encrypted_msg: Dict) -> str:
       sender_id = encrypted_msg["sender"]
       if sender_id not in self.session_keys:
           increase ValueError(f"No session established with {sender_id}")
       iv = bytes.fromhex(encrypted_msg["iv"])
       ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
       tag = bytes.fromhex(encrypted_msg["tag"])
       cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
       decryptor = cipher.decryptor()
       plaintext = decryptor.replace(ciphertext) + decryptor.finalize()
       if encrypted_msg.get("risk_score", 0) > 0.7:
           self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
       return plaintext.decode()

We implement the encryption and decryption logic that enables brokers to talk securely utilizing AES-GCM. Each message is encrypted, signed, risk-scored, and then safely decrypted by the recipient, preserving authenticity and confidentiality. Check out the FULL CODES here.

def sign_data(self, information: bytes) -> bytes:
       return self.private_key.signal(
           information,
           padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
           hashes.SHA256()
       )


   def analyze_encryption_pattern(self, message_length: int) -> float:
       recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
       avg_risk = np.imply([e.risk_score for e in recent_events]) if recent_events else 0.1
       risk_score = 0.1
       if message_length > 10000:
           risk_score += 0.3
       if self.encryption_count % 50 == 0 and self.encryption_count > 0:
           risk_score += 0.2
       risk_score = (risk_score + avg_risk) / 2
       self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
       return min(risk_score, 1.0)


   def log_security_event(self, event_type: str, risk_score: float, particulars: Dict):
       occasion = SecurityOccasion(timestamp=time.time(), event_type=event_type, risk_score=risk_score, particulars=particulars)
       self.security_events.append(occasion)


   def generate_security_report(self) -> Dict:
       if not self.security_events:
           return {"standing": "No occasions recorded"}
       total_events = len(self.security_events)
       high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
       avg_risk = np.imply([e.risk_score for e in self.security_events])
       event_types = {}
       for occasion in self.security_events:
           event_types[event.event_type] = event_types.get(occasion.event_type, 0) + 1
       return {
           "agent_id": self.agent_id,
           "total_events": total_events,
           "high_risk_events": len(high_risk_events),
           "average_risk_score": spherical(avg_risk, 3),
           "encryption_count": self.encryption_count,
           "key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
           "event_breakdown": event_types,
           "security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
       }

We add superior AI-driven elements that analyze encryption habits, detect anomalies, and log threat occasions. This snippet provides our agent adaptive intelligence, the flexibility to determine uncommon patterns, and the flexibility to generate a whole safety report. Check out the FULL CODES here.

def demo_crypto_agent_system():
   print("🔐 Advanced Cryptographic Agent System Demon")
   print("=" * 60)
   alice = CryptoAgent("Alice")
   bob = CryptoAgent("Bob")
   print("n1. Agents Created")
   print(f"   Alice ID: {alice.agent_id}")
   print(f"   Bob ID: {bob.agent_id}")
   print("n2. Establishing Secure Session (Hybrid Encryption)")
   alice_public_key = alice.get_public_key_bytes()
   bob_public_key = bob.get_public_key_bytes()
   encrypted_session_key = alice.establish_session("Bob", bob_public_key)
   bob.receive_session_key("Alice", encrypted_session_key)
   print(f"   ✓ Session established with {len(encrypted_session_key)} byte encrypted key")
   print("n3. Encrypting and Transmitting Messages")
   messages = [
       "Hello Bob! This is a secure message.",
       "The launch codes are: Alpha-7-Charlie-9",
       "Meeting at 3 PM tomorrow.",
       "This is a very long message " * 100
   ]
   for i, msg in enumerate(messages, 1):
       encrypted = alice.encrypt_message("Bob", msg)
       print(f"n   Message {i}:")
       print(f"   - Plaintext size: {len(msg)} chars")
       print(f"   - Ciphertext: {encrypted['ciphertext'][:60]}...")
       print(f"   - Risk Score: {encrypted['risk_score']:.3f}")
       print(f"   - Signature: {encrypted['signature'][:40]}...")
       decrypted = bob.decrypt_message(encrypted)
       print(f"   - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
       print(f"   - Verification: {'✓ SUCCESS' if decrypted == msg else '✗ FAILED'}")
   print("n4. AI-Powered Security Analysis")
   print("n   Alice's Security Report:")
   alice_report = alice.generate_security_report()
   for okay, v in alice_report.gadgets(): print(f"   - {okay}: {v}")
   print("n   Bob's Security Report:")
   bob_report = bob.generate_security_report()
   for okay, v in bob_report.gadgets(): print(f"   - {okay}: {v}")
   print("n" + "=" * 60)
   print("Demo Complete! Key Features Demonstrated:")
   print("✓ Hybrid encryption (RSA + AES-GCM)")
   print("✓ Digital signatures for authentication")
   print("✓ AI-powered anomaly detection")
   print("✓ Intelligent key rotation suggestions")
   print("✓ Real-time safety monitoring")


if __name__ == "__main__":
   demo_crypto_agent_system()

We display the complete cryptographic workflow during which two brokers securely change messages, detect anomalies, and overview detailed safety studies. We conclude the demo with an overview of all of the clever options the system performs.

In conclusion, we display how synthetic intelligence can improve conventional cryptography by introducing adaptability and context consciousness. We not solely encrypt and authenticate messages securely but in addition allow our brokers to be taught from communication habits and dynamically modify safety measures. By the top, we see how combining AI-driven analytics with hybrid encryption empowers a brand new technology of clever, self-monitoring cryptographic methods.


Check out the FULL CODES here. Feel free to try our GitHub Page for Tutorials, Codes and Notebooks. Also, be at liberty to observe us on Twitter and don’t overlook to be part of our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

The publish A Coding Guide to Build an AI-Powered Cryptographic Agent System with Hybrid Encryption, Digital Signatures, and Adaptive Security Intelligence appeared first on MarkTechPost.

Similar Posts