|

A Coding Implementation to Explore and Analyze the TaskTrove Dataset with Streaming Parsing Visualization and Verifier Detection

In this tutorial, we take a deep dive into the TaskTrove dataset on Hugging Face and construct a whole, sensible workflow to effectively discover it. Instead of downloading the full multi-gigabyte dataset, we stream it instantly and work with particular person samples in actual time. We start by establishing the surroundings and inspecting the uncooked construction of the dataset, specializing in how every process is saved as a compressed binary blob. We then implement sturdy parsing logic to decode these binaries into significant codecs akin to tar archives, zip recordsdata, JSON, or plain textual content. Along the approach, we analyze file buildings, examine metadata, and construct utilities to higher perceive the contents of every process.

import subprocess, sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U",
                      "datasets", "huggingface_hub", "polars", "pandas",
                      "matplotlib", "seaborn", "tqdm", "pyarrow"])


import os, io, gzip, json, tarfile, zipfile, base64, re, warnings
from pathlib import Path
from collections import Counter, defaultdict
from typing import Any, Dict, Iterator, List, Optional, Union


import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
from datasets import load_dataset
from huggingface_hub import HfApi


warnings.filterwarnings("ignore")
plt.rcParams["figure.dpi"] = 110
sns.set_style("whitegrid")
sns.set_palette("mako_r")


DATASET_ID = "open-thoughts/TaskTrove"
print("✓ surroundings prepared")


ds_test       = load_dataset(DATASET_ID, cut up="take a look at",       streaming=True)
ds_validation = load_dataset(DATASET_ID, cut up="validation", streaming=True)


first = subsequent(iter(ds_test))
print("Keys              :", record(first.keys()))
print("path              :", first["path"])
print("task_binary kind  :", kind(first["task_binary"]).__name__)
print("task_binary size:", len(first["task_binary"]), "bytes")

We arrange the complete surroundings by putting in all required libraries and importing the vital modules. We configure visualization settings and initialize the dataset streaming pipeline to cut back obtain sizes. We additionally examine the first pattern to perceive the dataset’s construction and key fields.

def to_bytes(blob) -> bytes:
   """Coerce no matter `datasets` provides us into uncooked bytes."""
   if isinstance(blob, (bytes, bytearray)):
       return bytes(blob)
   if isinstance(blob, record):
       return bytes(blob)
   if isinstance(blob, str):
       attempt:
           return base64.b64decode(blob)
       besides Exception:
           return blob.encode("utf-8", errors="exchange")
   return bytes(blob)




def parse_task(blob) -> Dict[str, Any]:
   """gunzip + auto-detect tar / zip / json / jsonl / textual content / binary."""
   uncooked = to_bytes(blob)
   compressed_size = len(uncooked)
   information = gzip.decompress(uncooked) if uncooked[:2] == b"x1fx8b" else uncooked
   raw_size = len(information)
   bio = io.BytesIO(information)


   attempt:
       with tarfile.open(fileobj=bio) as tar:
           recordsdata: Dict[str, Union[str, bytes]] = {}
           for m in tar.getmembers():
               if not m.isfile():
                   proceed
               f = tar.extractfile(m)
               if f is None:
                   proceed
               content material = f.learn()
               attempt:
                   recordsdata[m.name] = content material.decode("utf-8")
               besides UnicodeDecodeError:
                   recordsdata[m.name] = content material
           if recordsdata:
               return {"format": "tar", "recordsdata": recordsdata,
                       "raw_size": raw_size, "compressed_size": compressed_size}
   besides tarfile.TarError:
       cross


   bio.search(0)
   attempt:
       with zipfile.ZipFile(bio) as zf:
           recordsdata = {}
           for title in zf.namelist():
               if title.endswith("/"):
                   proceed
               with zf.open(title) as zh:
                   content material = zh.learn()
                   attempt:
                       recordsdata[name] = content material.decode("utf-8")
                   besides UnicodeDecodeError:
                       recordsdata[name] = content material
           return {"format": "zip", "recordsdata": recordsdata,
                   "raw_size": raw_size, "compressed_size": compressed_size}
   besides zipfile.BadZipFile:
       cross


   attempt:
       textual content = information.decode("utf-8")
       attempt:
           return {"format": "json", "content material": json.hundreds(textual content),
                   "raw_size": raw_size, "compressed_size": compressed_size}
       besides json.JSONDecodeError:
           attempt:
               objects = [json.loads(l) for l in text.splitlines() if l.strip()]
               return {"format": "jsonl", "content material": objects,
                       "raw_size": raw_size, "compressed_size": compressed_size}
           besides json.JSONDecodeError:
               return {"format": "textual content", "content material": textual content,
                       "raw_size": raw_size, "compressed_size": compressed_size}
   besides UnicodeDecodeError:
       return {"format": "binary", "content material": information,
               "raw_size": raw_size, "compressed_size": compressed_size}




uncooked = to_bytes(first["task_binary"])
print("First 16 bytes (hex):", uncooked[:16].hex(" "))
process = parse_task(first["task_binary"])
print(f"Format            : {process['format']}")
print(f"Compressed measurement   : {process['compressed_size']:>10,} bytes")
print(f"Decompressed measurement : {process['raw_size']:>10,} bytes")
if process["format"] in ("tar", "zip"):
   print(f"Members           : {len(process['files'])}")
   for title in record(process["files"])[:10]:
       physique = process["files"][name]
       measurement = len(physique) if isinstance(physique, (str, bytes)) else 0
       print(f"  {title:<60} {measurement:>8} bytes")

We construct sturdy utilities to convert uncooked process binaries into usable byte codecs and parse them intelligently. We deal with a number of codecs like tar, zip, JSON, JSONL, and plain textual content utilizing a unified parsing operate. We then decode and examine a pattern process to perceive its construction and measurement traits.

def show_task(process: Dict[str, Any], json_chars: int = 1500, code_chars: int = 600) -> None:
   print("═" * 70)
   print(f"FORMAT: {process['format']}   |   compressed {process['compressed_size']:,} → "
         f"uncooked {process['raw_size']:,} bytes")
   print("═" * 70)
   if process["format"] not in ("tar", "zip"):
       print(process.get("content material", "<binary>"))
       return
   recordsdata = process["files"]
   by_ext: Dict[str, List[str]] = defaultdict(record)
   for title in recordsdata:
       by_ext[Path(name).suffix.lower() or "<no-ext>"].append(title)
   print("nFile-type breakdown:")
   for ext, names in sorted(by_ext.objects(), key=lambda x: -len(x[1])):
       print(f"  {ext:<10} {len(names):>4} file(s)")


   meta = [n for n in files if n.lower().endswith((".json", ".yaml", ".yml", ".toml"))]
   code = [n for n in files if n.endswith(".py")]
   for title in meta[:3]:
       print(f"n--- {title} ---")
       physique = recordsdata[name]
       if isinstance(physique, str):
           attempt:
               fairly = json.dumps(json.hundreds(physique), indent=2)[:json_chars]
           besides json.JSONDecodeError:
               fairly = physique[:json_chars]
           print(fairly)
           if len(physique) > json_chars:
               print(f"… ({len(physique)-json_chars:,} extra chars)")
   for title in code[:2]:
       print(f"n--- {title} ---")
       physique = recordsdata[name]
       if isinstance(physique, str):
           print(physique[:code_chars])
           if len(physique) > code_chars:
               print(f"… ({len(physique)-code_chars:,} extra chars)")




show_task(process)




def source_of(path: str) -> str:
   return path.rsplit("-", 1)[0] if "-" in path else path




source_counts: Counter = Counter()
compressed_sizes: List[int] = []
for row in tqdm(ds_test, desc="counting paths"):
   source_counts[source_of(row["path"])] += 1
   compressed_sizes.append(len(row["task_binary"]))


print(f"nUnique supply prefixes: {len(source_counts)}")
print("Top 15 sources:")
for src, n in source_counts.most_common(15):
   print(f"  {n:>6}  {src}")


fig, axes = plt.subplots(1, 2, figsize=(14, 6))
TOP_N = 15
high = source_counts.most_common(TOP_N)
labels = [s for s, _ in top]
values = [n for _, n in top]
axes[0].barh(vary(len(labels)), values, colour=sns.color_palette("mako_r", len(labels)))
axes[0].set_yticks(vary(len(labels)))
axes[0].set_yticklabels(labels, fontsize=9)
axes[0].invert_yaxis()
axes[0].set_xlabel("variety of duties")
axes[0].set_title(f"Top {TOP_N} sources in take a look at cut up", fontweight="daring")
for i, v in enumerate(values):
   axes[0].textual content(v, i, f" {v:,}", va="middle", fontsize=8)


axes[1].hist(np.array(compressed_sizes) / 1024, bins=50,
            colour=sns.color_palette("mako_r")[2], edgecolor="white")
axes[1].set_xscale("log")
axes[1].set_xlabel("compressed measurement (KB, log scale)")
axes[1].set_ylabel("# duties")
axes[1].set_title("Distribution of compressed process sizes", fontweight="daring")
p50 = np.median(compressed_sizes) / 1024
p95 = np.percentile(compressed_sizes, 95) / 1024
axes[1].axvline(p50, colour="crimson", linestyle="--", alpha=0.7, label=f"median = {p50:.1f} KB")
axes[1].axvline(p95, colour="orange",  linestyle="--", alpha=0.7, label=f"p95    = {p95:.1f} KB")
axes[1].legend()
plt.tight_layout()
plt.present()

We create an in depth visualization of every process by printing structured file breakdowns and previews. We analyze the dataset distribution by counting supply prefixes and measuring compressed process sizes. We additionally generate plots to higher perceive the dataset composition and measurement distribution.

filename_counter: Counter = Counter()
all_json_keys:    Counter = Counter()
samples_for_show: List = []


for i, row in enumerate(tqdm(ds_test, desc="inspecting construction", complete=200)):
   if i >= 200:
       break
   p = parse_task(row["task_binary"])
   if p["format"] in ("tar", "zip"):
       for title, physique in p["files"].objects():
           filename_counter[name] += 1
           if title.endswith(".json") and isinstance(physique, str):
               attempt:
                   obj = json.hundreds(physique)
                   if isinstance(obj, dict):
                       for okay in obj.keys():
                           all_json_keys[k] += 1
               besides Exception:
                   cross
       if len(samples_for_show) < 2:
           samples_for_show.append((row["path"], p))


print("nMost widespread filenames inside process archives:")
for title, n in filename_counter.most_common(15):
   print(f"  {n:>4}  {title}")


print("nMost widespread top-level JSON keys (throughout any *.json):")
for okay, n in all_json_keys.most_common(20):
   print(f"  {n:>4}  {okay}")


if samples_for_show:
   print(f"nFull file itemizing for one pattern process ({samples_for_show[0][0]}):")
   for title, physique in samples_for_show[0][1]["files"].objects():
       sz = len(physique) if isinstance(physique, (str, bytes)) else 0
       print(f"  {title}  ({sz:,} B)")




VERIFIER_FILE_PATTERNS = ("verifier", "confirm", "grader", "decide", "rating", "eval")
VERIFIER_JSON_KEYS     = ("verifier", "verifier_config", "decide", "grader",
                         "rubric", "test_patch", "FAIL_TO_PASS", "assessments")




def has_verifier(parsed: Dict[str, Any]) -> bool:
   """Detect verifiers through filename, JSON content material, or each."""
   if parsed["format"] not in ("tar", "zip"):
       c = parsed.get("content material")
       if isinstance(c, dict):
           return any(okay in c for okay in VERIFIER_JSON_KEYS)
       return False


   recordsdata = parsed["files"]


   for title in recordsdata:
       low = title.decrease()
       if any(pat in low for pat in VERIFIER_FILE_PATTERNS):
           return True


   for title, physique in recordsdata.objects():
       if title.endswith((".json", ".yaml", ".yml")) and isinstance(physique, str):
           attempt:
               obj = json.hundreds(physique)
               if isinstance(obj, dict) and any(okay in obj for okay in VERIFIER_JSON_KEYS):
                   return True
           besides Exception:
               cross
           low = physique.decrease()
           if "verifier" in low or "test_patch" in low:
               return True


   return False




class TaskTroveExplorer:
   """High-level interface to the open-thoughts/TaskTrove dataset."""


   def __init__(self, cut up: str = "take a look at", dataset_id: str = DATASET_ID):
       self.dataset_id = dataset_id
       self.cut up = cut up
       self._ds = load_dataset(dataset_id, cut up=cut up, streaming=True)


   def iter(self, restrict: Optional[int] = None,
            source_filter: Optional[str] = None) -> Iterator[Dict[str, Any]]:
       rx = re.compile(source_filter) if source_filter else None
       n = 0
       for row in self._ds:
           if rx and not rx.search(source_of(row["path"])):
               proceed
           yield row
           n += 1
           if restrict shouldn't be None and n >= restrict:
               return


   def pattern(self, n: int = 5,
              source_filter: Optional[str] = None) -> List[Dict[str, Any]]:
       out = []
       for row in self.iter(restrict=n, source_filter=source_filter):
           parsed = parse_task(row["task_binary"])
           parsed["path"] = row["path"]
           parsed["source"] = source_of(row["path"])
           out.append(parsed)
       return out


   def abstract(self, restrict: int = 1000,
               source_filter: Optional[str] = None) -> pd.DataFrame:
       rows = []
       for row in self.iter(restrict=restrict, source_filter=source_filter):
           parsed = parse_task(row["task_binary"])
           rows.append({
               "supply": source_of(row["path"]),
               "compressed": parsed["compressed_size"],
               "uncooked": parsed["raw_size"],
               "format": parsed["format"],
               "n_files": len(parsed.get("recordsdata", {})),
               "has_verifier": has_verifier(parsed),
           })
       df = pd.DataFrame(rows)
       if df.empty:
           return df
       return (df.groupby("supply")
                 .agg(n=("compressed", "rely"),
                      mean_compressed_kb=("compressed", lambda s: s.imply()/1024),
                      mean_raw_kb=("uncooked",                lambda s: s.imply()/1024),
                      mean_n_files=("n_files", "imply"),
                      verifier_rate=("has_verifier", "imply"))
                 .spherical(2)
                 .sort_values("n", ascending=False))


   @staticmethod
   def has_verifier(parsed: Dict[str, Any]) -> bool:
       return has_verifier(parsed)


   def export(self, output_dir: Union[str, Path], n: int = 10,
              source_filter: Optional[str] = None) -> Path:
       output_dir = Path(output_dir)
       output_dir.mkdir(mother and father=True, exist_ok=True)
       for parsed in self.pattern(n=n, source_filter=source_filter):
           slug = parsed["path"].exchange("/", "_")
           tdir = output_dir / slug
           tdir.mkdir(exist_ok=True)
           if parsed["format"] in ("tar", "zip"):
               for title, physique in parsed["files"].objects():
                   out = tdir / title
                   out.mum or dad.mkdir(mother and father=True, exist_ok=True)
                   if isinstance(physique, str):
                       out.write_text(physique, encoding="utf-8")
                   else:
                       out.write_bytes(physique)
           else:
               content material = parsed.get("content material", b"")
               if isinstance(content material, (dict, record)):
                   (tdir / "process.json").write_text(json.dumps(content material, indent=2))
               elif isinstance(content material, str):
                   (tdir / "process.txt").write_text(content material)
               else:
                   (tdir / "process.bin").write_bytes(content material)
       print(f"✓ exported duties to {output_dir.resolve()}")
       return output_dir




explorer = TaskTroveExplorer(cut up="take a look at")


print("nSample of three parsed duties:")
for s in explorer.pattern(n=3):
   print(f"path: {s['path']} | supply: {s['source']} | format: {s['format']} | "
         f"recordsdata: {len(s.get('recordsdata', {}))} | verifier: {has_verifier(s)}")

We deeply examine the inside construction of duties by analyzing filenames and extracting widespread JSON keys. We implement a multi-signal verifier detection system to establish duties appropriate for analysis or RL workflows. We additionally construct a reusable explorer class that permits us to pattern, summarize, and export duties effectively.

abstract = explorer.abstract(restrict=1000)
print(f"nSummary throughout {len(abstract)} sources (1000 sampled rows):")
print(abstract.head(20))


if not abstract.empty:
   top_sources = abstract.head(12)
   fig, ax = plt.subplots(figsize=(11, 6))
   x = np.arange(len(top_sources))
   w = 0.4
   ax.bar(x - w/2, top_sources["mean_compressed_kb"], w, label="compressed (KB)",
          colour=sns.color_palette("mako_r")[2])
   ax.bar(x + w/2, top_sources["mean_raw_kb"], w, label="decompressed (KB)",
          colour=sns.color_palette("mako_r")[5])
   ax.set_xticks(x)
   ax.set_xticklabels(top_sources.index, rotation=40, ha="proper", fontsize=9)
   ax.set_ylabel("measurement (KB)")
   ax.set_yscale("log")
   ax.set_title("Mean process measurement by supply (high 12 by row rely)", fontweight="daring")
   ax.legend()
   plt.tight_layout()
   plt.present()


   fig, ax = plt.subplots(figsize=(11, 5))
   vs = abstract.head(15)["verifier_rate"].sort_values()
   colours = sns.color_palette("RdYlGn", as_cmap=True)(vs.values)
   ax.barh(vary(len(vs)), vs.values, colour=colours)
   ax.set_yticks(vary(len(vs)))
   ax.set_yticklabels(vs.index, fontsize=9)
   ax.set_xlabel("fraction of duties with verifier sign")
   ax.set_xlim(0, 1)
   ax.set_title("Verifier presence by sourcen(inexperienced = verified ⇒ usable for RL)",
                fontweight="daring")
   for i, v in enumerate(vs.values):
       ax.textual content(min(v + 0.01, 0.97), i, f"{v:.0%}", va="middle", fontsize=9)
   plt.tight_layout()
   plt.present()




verified_task = None
for row in tqdm(ds_test, desc="attempting to find a verified process"):
   parsed = parse_task(row["task_binary"])
   if has_verifier(parsed):
       parsed["path"] = row["path"]
       parsed["source"] = source_of(row["path"])
       verified_task = parsed
       break


if verified_task is None:
   print("No verified process present in take a look at cut up — attempt the validation cut up.")
else:
   print(f"Found verified process: {verified_task['path']}")
   print(f"Source             : {verified_task['source']}")
   if verified_task["format"] in ("tar", "zip"):
       candidates = []
       for n in verified_task["files"]:
           low = n.decrease()
           rating = sum(p in low for p in VERIFIER_FILE_PATTERNS)
           if n.endswith((".json", ".yaml", ".yml", ".py")):
               rating += 1
           candidates.append((rating, n))
       candidates.kind(reverse=True)
       for _, title in candidates[:2]:
           physique = verified_task["files"][name]
           if isinstance(physique, str):
               print(f"n--- {title} ({len(physique):,} chars) ---")
               print(physique[:2000])
               if len(physique) > 2000:
                   print(f"… ({len(physique)-2000:,} extra chars)")




EXPORT_DIR = Path("/content material/tasktrove_export") if Path("/content material").exists() 
            else Path("./tasktrove_export")
EXPORT_DIR.mkdir(exist_ok=True)
explorer.export(EXPORT_DIR, n=5)


for task_dir in sorted(EXPORT_DIR.iterdir())[:3]:
   print("─" * 60)
   print(task_dir.title)
   for sub in sorted(task_dir.rglob("*"))[:8]:
       if sub.is_file():
           print(f"  {sub.relative_to(task_dir)}  ({sub.stat().st_size:,} B)")




rows: List[Dict[str, Any]] = []
MAX_TASKS = 500
n_seen = 0


for row in tqdm(ds_test, desc="constructing slice", complete=MAX_TASKS):
   parsed = parse_task(row["task_binary"])
   n_seen += 1
   src = source_of(row["path"])
   is_verified = has_verifier(parsed) or "verifier" in src.decrease()


   recordsdata = parsed.get("recordsdata", {})
   instruction = ""
   for title in recordsdata:
       if title.endswith((".json", ".md", ".txt")) and isinstance(recordsdata[name], str):
           if len(recordsdata[name]) > len(instruction):
               instruction = recordsdata[name]


   rows.append({
       "path": row["path"],
       "supply": src,
       "is_verified": bool(is_verified),
       "n_files": len(recordsdata),
       "compressed_kb": parsed["compressed_size"] / 1024,
       "raw_kb": parsed["raw_size"] / 1024,
       "instruction_preview": instruction[:300],
   })
   if len(rows) >= MAX_TASKS:
       break


df = pl.DataFrame(rows)
print(f"nInspected {n_seen} rows, saved {len(df)} complete "
     f"({df['is_verified'].sum() if len(df) else 0} flagged verified)")
if len(df):
   print(df.head(5))


if len(df) == 0:
   print("Empty slice — nothing to mixture or save.")
else:
   grouped = (df.group_by("supply")
                .agg([pl.len().alias("n"),
                      pl.col("is_verified").sum().alias("n_verified"),
                      pl.col("raw_kb").mean().round(1).alias("mean_raw_kb"),
                      pl.col("n_files").mean().round(1).alias("mean_n_files")])
                .kind("n", descending=True))
   print("nSlice composition by supply:")
   print(grouped)


   out_path = (Path("/content material") if Path("/content material").exists() else Path(".")) 
              / "tasktrove_slice.parquet"
   df.write_parquet(out_path)
   print(f"n✓ wrote {len(df)} rows to {out_path} "
         f"({out_path.stat().st_size/1024:.1f} KB)")




api = HfApi()
recordsdata = api.list_repo_files(repo_id=DATASET_ID, repo_type="dataset")
subdirs = sorted({f.cut up("/", 1)[0] for f in recordsdata
                 if "/" in f and "__" in f.cut up("/", 1)[0]})
print(f"nFound {len(subdirs)} source-dataset subdirectories. First 25:")
for s in subdirs[:25]:
   print(" ", s)

We mixture statistics throughout sources and visualize key metrics, akin to process measurement and verifier presence. We establish and examine a verified process to perceive how analysis indicators are structured. Finally, we construct a clear dataset slice, export it, and put together it for downstream evaluation or modeling workflows.

In conclusion, we constructed a complete pipeline to discover, analyze, and extract worth from the TaskTrove dataset. We generated insights into supply distributions, process sizes, and inside file patterns, and constructed mechanisms to detect verifier indicators indicating high-quality, evaluation-ready duties. We additionally created reusable instruments, akin to the TaskTroveExplorer class, to pattern, summarize, and export duties for downstream use. Also, we produced a clear, structured dataset slice that may be instantly used for analysis, benchmarking, or reinforcement studying workflows. Through this course of, we find out how to deal with advanced dataset codecs effectively and additionally set up a scalable method to working with massive, structured AI datasets in real-world situations.


Check out the Full Codes with Notebook here. Also, be at liberty to comply with us on Twitter and don’t overlook to be a part of our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to companion with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and so on.? Connect with us

The put up A Coding Implementation to Explore and Analyze the TaskTrove Dataset with Streaming Parsing Visualization and Verifier Detection appeared first on MarkTechPost.

Similar Posts