|

How to Build an End-to-End Production Grade Machine Learning Pipeline with ZenML, Including Custom Materializers, Metadata Tracking, and Hyperparameter Optimization

🏆

In this tutorial, we stroll via an end-to-end implementation of an superior machine studying pipeline utilizing ZenML. We start by establishing the setting and initializing a ZenML challenge, then outline a customized materializer that allows seamless serialization and metadata extraction for a domain-specific dataset object. As we progress, we construct a modular pipeline that performs information loading, preprocessing, and a fan-out hyperparameter search throughout a number of fashions. We consider every candidate, log wealthy metadata at each step, and use a fan-in technique to choose and promote the best-performing mannequin. Throughout the method, we leverage ZenML’s mannequin management aircraft, artifact monitoring, and caching mechanisms to guarantee full reproducibility, transparency, and effectivity.

import os, sys, subprocess, json, shutil
from pathlib import Path


def _sh(cmd, verify=True):
   print(f"$ {' '.be part of(cmd)}")
   return subprocess.run(cmd, verify=verify)


_sh([sys.executable, "-m", "pip", "install", "-q",
    "zenml[server]", "scikit-learn", "pandas", "pyarrow"])


PROJECT = Path("/content material/zenml_advanced_tutorial") if Path("/content material").exists() 
   else Path.cwd() / "zenml_advanced_tutorial"
if PROJECT.exists():
   shutil.rmtree(PROJECT)
PROJECT.mkdir(mother and father=True)
os.chdir(PROJECT)


os.environ["ZENML_ANALYTICS_OPT_IN"] = "false"
os.environ["ZENML_LOGGING_VERBOSITY"] = "WARN"
_sh(["zenml", "init"], verify=False)

We arrange the complete setting by putting in required libraries and initializing a ZenML challenge workspace. We create a clear working listing and configure setting variables to management logging and analytics conduct. Finally, we bootstrap the ZenML repository so that each one subsequent pipeline operations are correctly tracked and managed.

from typing import Annotated, Tuple, Dict, List, Any
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.preprocessing import StandardScaler


from zenml import pipeline, step, log_metadata, Model, get_step_context
from zenml.consumer import Client
from zenml.materializers.base_materializer import BaseMaterializer
from zenml.enums import ArtifactKind
from zenml.io import fileio


class DatasetBundle:
   def __init__(self, X, y, feature_names, stats=None):
       self.X = np.asarray(X)
       self.y = np.asarray(y)
       self.feature_names = listing(feature_names)
       self.stats = stats or {}


class DatasetBundleMaterializer(BaseMaterializer):
   ASSOCIATED_TYPES = (DatasetBundle,)
   ASSOCIATED_ARTIFACT_TYPE = ArtifactKind.DATA


   def load(self, data_type):
       with fileio.open(os.path.be part of(self.uri, "X.npy"), "rb") as f:
           X = np.load(f)
       with fileio.open(os.path.be part of(self.uri, "y.npy"), "rb") as f:
           y = np.load(f)
       with fileio.open(os.path.be part of(self.uri, "meta.json"), "r") as f:
           meta = json.hundreds(f.learn())
       return DatasetBundle(X, y, meta["feature_names"], meta["stats"])


   def save(self, bundle):
       with fileio.open(os.path.be part of(self.uri, "X.npy"), "wb") as f:
           np.save(f, bundle.X)
       with fileio.open(os.path.be part of(self.uri, "y.npy"), "wb") as f:
           np.save(f, bundle.y)
       with fileio.open(os.path.be part of(self.uri, "meta.json"), "w") as f:
           f.write(json.dumps({
               "feature_names": bundle.feature_names,
               "stats": bundle.stats,
           }))


   def extract_metadata(self, bundle):
       courses, counts = np.distinctive(bundle.y, return_counts=True)
       return {
           "n_samples": int(bundle.X.form[0]),
           "n_features": int(bundle.X.form[1]),
           "class_distribution": {str(c): int(n) for c, n in zip(courses, counts)},
       }

We import all vital libraries and outline a customized information container alongside with its materializer. We implement logic to save, load, and extract metadata from our dataset, enabling seamless artifact dealing with in ZenML. This ensures that our information is just not solely saved effectively but in addition enriched with significant, queryable metadata.

@step(enable_cache=True)
def load_data() -> Annotated[DatasetBundle, "raw_dataset"]:
   information = load_breast_cancer()
   return DatasetBundle(
       information.information, information.goal, information.feature_names,
       stats={"supply": "sklearn.datasets.load_breast_cancer"},
   )


@step
def split_and_scale(
   bundle: DatasetBundle,
   test_size: float = 0.2,
   random_state: int = 42,
) -> Tuple[
   Annotated[np.ndarray, "X_train"],
   Annotated[np.ndarray, "X_test"],
   Annotated[np.ndarray, "y_train"],
   Annotated[np.ndarray, "y_test"],
]:
   X_tr, X_te, y_tr, y_te = train_test_split(
       bundle.X, bundle.y, test_size=test_size,
       random_state=random_state, stratify=bundle.y,
   )
   scaler = StandardScaler().match(X_tr)
   X_tr, X_te = scaler.rework(X_tr), scaler.rework(X_te)
   log_metadata(metadata={"train_size": len(X_tr), "test_size": len(X_te)})
   return X_tr, X_te, y_tr, y_te


@step
def train_candidate(
   X_train: np.ndarray,
   y_train: np.ndarray,
   model_type: str = "random_forest",
   n_estimators: int = 100,
   max_depth: int = 5,
) -> Annotated[Any, "candidate_model"]:
   if model_type == "random_forest":
       m = RandomForestClassifier(n_estimators=n_estimators,
                                  max_depth=max_depth, random_state=42)
   elif model_type == "gradient_boosting":
       m = GradientBoostingClassifier(n_estimators=n_estimators,
                                      max_depth=max_depth, random_state=42)
   else:
       m = LogisticRegression(max_iter=2000, random_state=42)
   m.match(X_train, y_train)
   log_metadata(metadata={
       "model_type": model_type,
       "hyperparameters": {"n_estimators": n_estimators, "max_depth": max_depth},
   })
   return m

We outline core pipeline steps for loading information, splitting it, scaling options, and coaching mannequin candidates. We make sure that information loading is cached for effectivity whereas logging key metadata throughout preprocessing and coaching. This kinds the spine of our pipeline, the place every mannequin is educated independently with its respective configuration.

@step
def evaluate_candidate(
   mannequin: Any,
   X_test: np.ndarray,
   y_test: np.ndarray,
   label: str,
) -> Annotated[Dict[str, Any], "metrics"]:
   preds = mannequin.predict(X_test)
   probs = (mannequin.predict_proba(X_test)[:, 1]
            if hasattr(mannequin, "predict_proba") else preds)
   metrics: Dict[str, Any] = {
       "accuracy": float(accuracy_score(y_test, preds)),
       "f1":       float(f1_score(y_test, preds)),
       "roc_auc":  float(roc_auc_score(y_test, probs)),
       "label":    label,
   }
   log_metadata(metadata=metrics)
   return metrics


@step
def select_best(
   metrics_list: List[Dict[str, Any]],
   fashions: List[Any],
) -> Annotated[Any, "production_model"]:
   best_idx = max(vary(len(metrics_list)),
                  key=lambda i: metrics_list[i]["roc_auc"])
   finest = metrics_list[best_idx]


   ctx = get_step_context()
   attempt:
       ctx.mannequin.log_metadata({"chosen_candidate": finest,
                               "candidate_index": best_idx})
   besides Exception as e:
       print(f"  (mannequin metadata log skipped: {e})")


   log_metadata(metadata={
       "winning_metrics": {ok: v for ok, v in finest.gadgets() if ok != "label"},
   })
   print(f"n🏆  Best candidate: {finest['label']}  →  "
         f"ROC AUC = {finest['roc_auc']:.4f}n")
   return fashions[best_idx]

We consider every educated mannequin utilizing a number of efficiency metrics and log the outcomes. We then implement a range mechanism that identifies the best-performing mannequin primarily based on ROC AUC. Additionally, we connect related metadata to the mannequin model, enabling traceability and knowledgeable decision-making.

SEARCH_SPACE = [
   {"model_type": "random_forest",     "n_estimators": 50,  "max_depth": 3},
   {"model_type": "random_forest",     "n_estimators": 200, "max_depth": 7},
   {"model_type": "gradient_boosting", "n_estimators": 100, "max_depth": 3},
   {"model_type": "logistic",          "n_estimators": 1,   "max_depth": 1},
]


PRODUCTION_MODEL = Model(
   title="breast_cancer_classifier",
   description="Best mannequin from in-pipeline hyperparameter search",
   tags=["tutorial", "advanced"],
)


@pipeline(mannequin=PRODUCTION_MODEL, enable_cache=True)
def training_pipeline(test_size: float = 0.2):
   bundle = load_data()


   fashions, metrics = [], []
   for i, cfg in enumerate(SEARCH_SPACE):
       m = train_candidate(
           X_train, y_train, **cfg,
           id=f"train_{i}_{cfg['model_type']}",
       )
       s = evaluate_candidate(
           m, X_test, y_test,
           label=f"{cfg['model_type']}(n={cfg['n_estimators']},d={cfg['max_depth']})",
           id=f"eval_{i}",
       )
       fashions.append(m)
       metrics.append(s)


   select_best(metrics, fashions)


print("n" + "=" * 70 + "n  RUNNING TRAINING PIPELINEn" + "=" * 70)
run_obj = training_pipeline()


print("n" + "=" * 70 + "n  INSPECTING THE RUNn" + "=" * 70)
consumer = Client()
run = consumer.get_pipeline_run(run_obj.id)


print(f"nPipeline:   {run.pipeline.title}")
print(f"Run title:   {run.title}")
print(f"Status:     {run.standing}")
print(f"Step runs:  {len(run.steps)}")
for title, step_run in run.steps.gadgets():
   print(f"  • {title:35s} standing={step_run.standing}")


print("nRun-level metadata (aggregated from steps):")
for ok, v in (run.run_metadata or {}).gadgets():
   brief = str(v)
   print(f"  {ok}: {brief[:80]}{'…' if len(brief) > 80 else ''}")


print("n" + "-" * 70 + "n  MODEL CONTROL PLANEn" + "-" * 70)
attempt:
   mv = consumer.get_model_version(PRODUCTION_MODEL.title, "newest")
besides Exception:
   mv = consumer.list_model_versions(model_name_or_id=PRODUCTION_MODEL.title)[0]


print(f"Model:           {mv.mannequin.title}")
print(f"Version:         {mv.title} (quantity={mv.quantity})")
linked = listing(mv.data_artifact_ids.keys()) if hasattr(mv, "data_artifact_ids") else []
print(f"Linked outputs:  {linked or '(see dashboard)'}")
if mv.run_metadata:
   print("Version metadata:")
   for ok, v in dict(mv.run_metadata).gadgets():
       print(f"  {ok}: {str(v)[:80]}")


print("n" + "-" * 70 + "n  RELOADING ARTIFACTS DIRECTLYn" + "-" * 70)
prod_artifact = consumer.get_artifact_version("production_model")
prod_model = prod_artifact.load()
print(f"Loaded mannequin class:   {sort(prod_model).__name__}")
print(f"Artifact metadata:    {dict(prod_artifact.run_metadata) if prod_artifact.run_metadata else '{}'}"[:120])


X_test_arr = consumer.get_artifact_version("X_test").load()
y_test_arr = consumer.get_artifact_version("y_test").load()
acc = accuracy_score(y_test_arr, prod_model.predict(X_test_arr))
print(f"Sanity-check accuracy on saved X_test: {acc:.4f}")


ds_artifact = consumer.get_artifact_version("raw_dataset")
print(f"nraw_dataset auto-extracted metadata:")
for ok, v in (ds_artifact.run_metadata or {}).gadgets():
   print(f"  {ok}: {v}")


print("n" + "=" * 70 + "n  RE-RUNNING — STEPS SHOULD BE CACHEDn" + "=" * 70)
training_pipeline()


print("""
✅ Tutorial full.


What simply occurred:
 • Custom materializer serialized a website object + auto-extracted metadata.
 • Fan-out: 4 candidates educated + evaluated as 8 distinct step runs.
 • Fan-in: select_best joined them and promoted the winner.
 • Model Control Plane created a versioned 'breast_cancer_classifier'.
 • Every artifact, metric, and hyperparameter was logged and queryable.
 • Second run hit the cache — zero recomputation.


Explore farther from this identical Python session:
 Client().list_pipeline_runs()
 Client().list_model_versions(model_name_or_id="breast_cancer_classifier")
 Client().list_artifact_versions(title="metrics")
""")

We outline the total pipeline, execute it, and examine the outcomes utilizing the ZenML Client API. We carry out a fan-out over a number of configurations, adopted by a fan-in step to choose the perfect mannequin. Finally, we reveal artifact reuse, metadata inspection, and caching conduct by re-running the pipeline with out redundant computation.

In conclusion, we constructed a strong, production-style ML pipeline that demonstrates the total energy of ZenML’s orchestration capabilities. We noticed how customized materializers enrich artifacts with significant metadata, how a number of mannequin candidates could be educated and evaluated in parallel, and how the perfect mannequin is mechanically chosen and versioned. We additionally explored how to examine pipeline runs, retrieve artifacts immediately with out recomputation, and confirm mannequin efficiency utilizing saved information. Also, we noticed caching in motion throughout a re-run, confirming that redundant computations are averted. This workflow gives a robust basis for constructing scalable, maintainable, and reproducible machine studying programs in real-world eventualities.


Check out the Full Codes with Notebook here. Also, be happy to comply with us on Twitter and don’t neglect to be part of our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to companion with us for selling your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar and many others.? Connect with us

The put up How to Build an End-to-End Production Grade Machine Learning Pipeline with ZenML, Including Custom Materializers, Metadata Tracking, and Hyperparameter Optimization appeared first on MarkTechPost.

Similar Posts