|

Step by Step Guide to Build an End-to-End Model Optimization Pipeline with NVIDIA Model Optimizer Using FastNAS Pruning and Fine-Tuning

In this tutorial, we construct a whole end-to-end pipeline utilizing NVIDIA Model Optimizer to practice, prune, and fine-tune a deep studying mannequin immediately in Google Colab. We begin by establishing the atmosphere and getting ready the CIFAR-10 dataset, then outline a ResNet structure and practice it to set up a robust baseline. From there, we apply FastNAS pruning to systematically cut back the mannequin’s complexity beneath FLOPs constraints whereas preserving efficiency. We additionally deal with real-world compatibility points, restore the optimized subnet, and fine-tune it to get well accuracy. By the top, now we have a completely working workflow that takes a mannequin from coaching to deployment-ready optimization, all inside a single streamlined setup. Check out the Full Implementation Coding Notebook.

!pip -q set up -U nvidia-modelopt torchvision torchprofile tqdm


import math
import os
import random
import time


import numpy as np
import torch
import torch.nn as nn
import torch.nn.practical as F
import torchvision
import torchvision.transforms as transforms


from torch.utils.knowledge import DataLoader, Subset
from torchvision.fashions.resnet import BasicBlock
from tqdm.auto import tqdm


import modelopt.torch.choose as mto
import modelopt.torch.prune as mtp


SEED = 123
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
   torch.cuda.manual_seed_all(SEED)


FAST_MODE = True


batch_size = 256 if FAST_MODE else 512
baseline_epochs = 20 if FAST_MODE else 120
finetune_epochs = 12 if FAST_MODE else 120


train_subset_size = 12000 if FAST_MODE else None
val_subset_size   = 2000  if FAST_MODE else None
test_subset_size  = 4000  if FAST_MODE else None


target_flops = 60e6

We start by putting in all required dependencies and importing the required libraries to arrange our surroundings. We initialize seeds to guarantee reproducibility and configure the machine to leverage a GPU if out there. We additionally outline key runtime parameters, resembling batch dimension, epochs, dataset subsets, and FLOP constraints, to management the general experiment.

def seed_worker(worker_id):
   worker_seed = SEED + worker_id
   np.random.seed(worker_seed)
   random.seed(worker_seed)


def build_cifar10_loaders(train_batch_size=256,
                         train_subset_size=None,
                         val_subset_size=None,
                         test_subset_size=None):
   normalize = transforms.Normalize(
       imply=[0.4914, 0.4822, 0.4465],
       std=[0.2470, 0.2435, 0.2616],
   )


   train_transform = transforms.Compose([
       transforms.ToTensor(),
       transforms.RandomHorizontalFlip(),
       transforms.RandomCrop(32, padding=4),
       normalize,
   ])
   eval_transform = transforms.Compose([
       transforms.ToTensor(),
       normalize,
   ])


   train_full = torchvision.datasets.CIFAR10(
       root="./knowledge", practice=True, rework=train_transform, obtain=True
   )
   val_full = torchvision.datasets.CIFAR10(
       root="./knowledge", practice=True, rework=eval_transform, obtain=True
   )
   test_full = torchvision.datasets.CIFAR10(
       root="./knowledge", practice=False, rework=eval_transform, obtain=True
   )


   n_trainval = len(train_full)
   ids = np.arange(n_trainval)
   np.random.shuffle(ids)


   n_train = int(n_trainval * 0.9)
   train_ids = ids[:n_train]
   val_ids = ids[n_train:]


   if train_subset_size isn't None:
       train_ids = train_ids[:min(train_subset_size, len(train_ids))]
   if val_subset_size isn't None:
       val_ids = val_ids[:min(val_subset_size, len(val_ids))]


   test_ids = np.arange(len(test_full))
   if test_subset_size isn't None:
       test_ids = test_ids[:min(test_subset_size, len(test_ids))]


   train_ds = Subset(train_full, train_ids.tolist())
   val_ds = Subset(val_full, val_ids.tolist())
   test_ds = Subset(test_full, test_ids.tolist())


   num_workers = min(2, os.cpu_count() or 1)


   g = torch.Generator()
   g.manual_seed(SEED)


   train_loader = DataLoader(
       train_ds,
       batch_size=train_batch_size,
       shuffle=True,
       num_workers=num_workers,
       pin_memory=torch.cuda.is_available(),
       worker_init_fn=seed_worker,
       generator=g,
   )
   val_loader = DataLoader(
       val_ds,
       batch_size=512,
       shuffle=False,
       num_workers=num_workers,
       pin_memory=torch.cuda.is_available(),
       worker_init_fn=seed_worker,
   )
   test_loader = DataLoader(
       test_ds,
       batch_size=512,
       shuffle=False,
       num_workers=num_workers,
       pin_memory=torch.cuda.is_available(),
       worker_init_fn=seed_worker,
   )


   print(f"Train: {len(train_ds)} | Val: {len(val_ds)} | Test: {len(test_ds)}")
   return train_loader, val_loader, test_loader


train_loader, val_loader, test_loader = build_cifar10_loaders(
   train_batch_size=batch_size,
   train_subset_size=train_subset_size,
   val_subset_size=val_subset_size,
   test_subset_size=test_subset_size,
)

We assemble the total knowledge pipeline by getting ready CIFAR-10 datasets with applicable augmentations and normalization. We cut up the dataset to cut back its dimension and velocity up experimentation. We then create environment friendly knowledge loaders that guarantee correct batching, shuffling, and reproducible knowledge dealing with.

def _weights_init(m):
   if isinstance(m, (nn.Linear, nn.Conv2d)):
       nn.init.kaiming_normal_(m.weight)


class LambdaLayer(nn.Module):
   def __init__(self, lambd):
       tremendous().__init__()
       self.lambd = lambd


   def ahead(self, x):
       return self.lambd(x)


class ResNet(nn.Module):
   def __init__(self, num_blocks, num_classes=10):
       tremendous().__init__()
       self.in_planes = 16
       self.layers = nn.Sequential(
           nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False),
           nn.BatchNorm2d(16),
           nn.ReLU(),
           self._make_layer(16, num_blocks, stride=1),
           self._make_layer(32, num_blocks, stride=2),
           self._make_layer(64, num_blocks, stride=2),
           nn.AdaptiveAvgPool2d((1, 1)),
           nn.Flatten(),
           nn.Linear(64, num_classes),
       )
       self.apply(_weights_init)


   def _make_layer(self, planes, num_blocks, stride):
       strides = [stride] + [1] * (num_blocks - 1)
       layers = []
       for s in strides:
           downsample = None
           if s != 1 or self.in_planes != planes:
               downsample = LambdaLayer(
                   lambda x: F.pad(
                       x[:, :, ::2, ::2],
                       (0, 0, 0, 0, planes // 4, planes // 4),
                       "fixed",
                       0,
                   )
               )
           layers.append(BasicBlock(self.in_planes, planes, s, downsample))
           self.in_planes = planes
       return nn.Sequential(*layers)


   def ahead(self, x):
       return self.layers(x)


def resnet20():
   return ResNet(num_blocks=3).to(machine)

We outline the ResNet20 structure from scratch, together with customized initialization and shortcut dealing with by way of lambda layers. We construction the community utilizing convolutional blocks and residual connections to seize hierarchical options. We lastly encapsulate the mannequin creation right into a reusable operate that strikes it immediately to the chosen machine.

class CosineLRwithWarmup(torch.optim.lr_scheduler._LRScheduler):
   def __init__(self, optimizer, warmup_steps, decay_steps, warmup_lr=0.0, last_epoch=-1):
       self.warmup_steps = warmup_steps
       self.warmup_lr = warmup_lr
       self.decay_steps = max(decay_steps, 1)
       tremendous().__init__(optimizer, last_epoch)


   def get_lr(self):
       if self.last_epoch < self.warmup_steps:
           return [
               (base_lr - self.warmup_lr) * self.last_epoch / max(self.warmup_steps, 1) + self.warmup_lr
               for base_lr in self.base_lrs
           ]
       current_steps = self.last_epoch - self.warmup_steps
       return [
           0.5 * base_lr * (1 + math.cos(math.pi * current_steps / self.decay_steps))
           for base_lr in self.base_lrs
       ]


def get_optimizer_scheduler(mannequin, lr, weight_decay, warmup_steps, decay_steps):
   optimizer = torch.optim.SGD(
       filter(lambda p: p.requires_grad, mannequin.parameters()),
       lr=lr,
       momentum=0.9,
       weight_decay=weight_decay,
   )
   scheduler = CosineLRwithWarmup(optimizer, warmup_steps, decay_steps)
   return optimizer, scheduler


def loss_fn_default(mannequin, outputs, labels):
   return F.cross_entropy(outputs, labels)


def train_one_epoch(mannequin, loader, optimizer, scheduler, loss_fn=loss_fn_default):
   mannequin.practice()
   running_loss = 0.0
   whole = 0
   for photographs, labels in loader:
       photographs = photographs.to(machine, non_blocking=True)
       labels = labels.to(machine, non_blocking=True)


       outputs = mannequin(photographs)
       loss = loss_fn(mannequin, outputs, labels)


       optimizer.zero_grad(set_to_none=True)
       loss.backward()
       optimizer.step()
       scheduler.step()


       running_loss += loss.merchandise() * labels.dimension(0)
       whole += labels.dimension(0)


   return running_loss / max(whole, 1)


@torch.no_grad()
def consider(mannequin, loader):
   mannequin.eval()
   right = 0
   whole = 0
   for photographs, labels in loader:
       photographs = photographs.to(machine, non_blocking=True)
       labels = labels.to(machine, non_blocking=True)
       logits = mannequin(photographs)
       preds = logits.argmax(dim=1)
       right += (preds == labels).sum().merchandise()
       whole += labels.dimension(0)
   return 100.0 * right / max(whole, 1)


def train_model(mannequin, train_loader, val_loader, epochs, ckpt_path,
               lr=None, weight_decay=1e-4, print_every=1):
   if lr is None:
       lr = 0.1 * batch_size / 128


   steps_per_epoch = len(train_loader)
   warmup_steps = max(1, 2 * steps_per_epoch if FAST_MODE else 5 * steps_per_epoch)
   decay_steps = max(1, epochs * steps_per_epoch)


   optimizer, scheduler = get_optimizer_scheduler(
       mannequin=mannequin,
       lr=lr,
       weight_decay=weight_decay,
       warmup_steps=warmup_steps,
       decay_steps=decay_steps,
   )


   best_val = -1.0
   best_epoch = -1


   print(f"Training for {epochs} epochs...")
   for epoch in tqdm(vary(1, epochs + 1)):
       train_loss = train_one_epoch(mannequin, train_loader, optimizer, scheduler)
       val_acc = consider(mannequin, val_loader)


       if val_acc >= best_val:
           best_val = val_acc
           best_epoch = epoch
           torch.save(mannequin.state_dict(), ckpt_path)


       if epoch == 1 or epoch % print_every == 0 or epoch == epochs:
           print(f"Epoch {epoch:03d} | train_loss={train_loss:.4f} | val_acc={val_acc:.2f}%")


   mannequin.load_state_dict(torch.load(ckpt_path, map_location=machine))
   print(f"Restored finest checkpoint from epoch {best_epoch} with val_acc={best_val:.2f}%")
   return mannequin, best_val

We implement the coaching utilities, together with a cosine studying price scheduler with warmup, to allow secure optimization. We outline loss computation, a coaching loop for one epoch, and an analysis operate to measure accuracy. We then construct a whole coaching pipeline that tracks the perfect mannequin and restores it based mostly on validation efficiency.

baseline_model = resnet20()
baseline_ckpt = "resnet20_baseline.pth"


begin = time.time()
baseline_model, baseline_val = train_model(
   baseline_model,
   train_loader,
   val_loader,
   epochs=baseline_epochs,
   ckpt_path=baseline_ckpt,
   lr=0.1 * batch_size / 128,
   weight_decay=1e-4,
   print_every=max(1, baseline_epochs // 4),
)
baseline_test = consider(baseline_model, test_loader)
baseline_time = time.time() - begin


print(f"nBaseline validation accuracy: {baseline_val:.2f}%")
print(f"Baseline check accuracy:       {baseline_test:.2f}%")
print(f"Baseline coaching time:       {baseline_time/60:.2f} min")


fastnas_cfg = mtp.fastnas.FastNASConfig()
fastnas_cfg["nn.Conv2d"]["*"]["channel_divisor"] = 16
fastnas_cfg["nn.BatchNorm2d"]["*"]["feature_divisor"] = 16


dummy_input = torch.randn(1, 3, 32, 32, machine=machine)


def score_func(mannequin):
   return consider(mannequin, val_loader)


search_ckpt = "modelopt_search_checkpoint_fastnas.pth"
pruned_ckpt = "modelopt_pruned_model_fastnas.pth"


import torchprofile.profile as tp_profile
from torchprofile.handlers import HANDLER_MAP


if not hasattr(tp_profile, "handlers"):
   tp_profile.handlers = tuple((tuple([op_name]), handler) for op_name, handler in HANDLER_MAP.gadgets())


print("nRunning FastNAS pruning...")
prune_start = time.time()


model_for_prune = resnet20()
model_for_prune.load_state_dict(torch.load(baseline_ckpt, map_location=machine))


pruned_model, pruned_metadata = mtp.prune(
   mannequin=model_for_prune,
   mode=[("fastnas", fastnas_cfg)],
   constraints={"flops": target_flops},
   dummy_input=dummy_input,
   config={
       "data_loader": train_loader,
       "score_func": score_func,
       "checkpoint": search_ckpt,
   },
)


mto.save(pruned_model, pruned_ckpt)
prune_elapsed = time.time() - prune_start


pruned_test_before_ft = consider(pruned_model, test_loader)


print(f"Pruned mannequin check accuracy earlier than fine-tune: {pruned_test_before_ft:.2f}%")
print(f"Pruning/search time: {prune_elapsed/60:.2f} min")

We practice the baseline mannequin and consider its efficiency to set up a reference level for optimization. We then configure FastNAS pruning, outline constraints, and apply a compatibility patch to guarantee correct FLOPs profiling. We execute the pruning course of to generate a compressed mannequin and consider its efficiency earlier than fine-tuning.

restored_pruned_model = resnet20()
restored_pruned_model = mto.restore(restored_pruned_model, pruned_ckpt)


restored_test = consider(restored_pruned_model, test_loader)
print(f"Restored pruned mannequin check accuracy: {restored_test:.2f}%")


print("nFine-tuning pruned mannequin...")
finetune_ckpt = "resnet20_pruned_finetuned.pth"


start_ft = time.time()
restored_pruned_model, pruned_val_after_ft = train_model(
   restored_pruned_model,
   train_loader,
   val_loader,
   epochs=finetune_epochs,
   ckpt_path=finetune_ckpt,
   lr=0.05 * batch_size / 128,
   weight_decay=1e-4,
   print_every=max(1, finetune_epochs // 4),
)
pruned_test_after_ft = consider(restored_pruned_model, test_loader)
ft_time = time.time() - start_ft


print(f"nFine-tuned pruned validation accuracy: {pruned_val_after_ft:.2f}%")
print(f"Fine-tuned pruned check accuracy:       {pruned_test_after_ft:.2f}%")
print(f"Fine-tuning time:                      {ft_time/60:.2f} min")


def count_params(mannequin):
   return sum(p.numel() for p in mannequin.parameters())


def count_nonzero_params(mannequin):
   whole = 0
   for p in mannequin.parameters():
       whole += (p.detach() != 0).sum().merchandise()
   return whole


baseline_params = count_params(baseline_model)
pruned_params = count_params(restored_pruned_model)


baseline_nonzero = count_nonzero_params(baseline_model)
pruned_nonzero = count_nonzero_params(restored_pruned_model)


print("n" + "=" * 60)
print("FINAL SUMMARY")
print("=" * 60)
print(f"Baseline check accuracy:                 {baseline_test:.2f}%")
print(f"Pruned check accuracy earlier than finetune:   {pruned_test_before_ft:.2f}%")
print(f"Pruned check accuracy after finetune:    {pruned_test_after_ft:.2f}%")
print("-" * 60)
print(f"Baseline whole params:                  {baseline_params:,}")
print(f"Pruned whole params:                    {pruned_params:,}")
print(f"Baseline nonzero params:                {baseline_nonzero:,}")
print(f"Pruned nonzero params:                  {pruned_nonzero:,}")
print("-" * 60)
print(f"Baseline practice time:                    {baseline_time/60:.2f} min")
print(f"Pruning/search time:                    {prune_elapsed/60:.2f} min")
print(f"Pruned finetune time:                   {ft_time/60:.2f} min")
print("=" * 60)


torch.save(baseline_model.state_dict(), "baseline_resnet20_final_state_dict.pth")
mto.save(restored_pruned_model, "pruned_resnet20_final_modelopt.pth")


print("nSaved recordsdata:")
print(" - baseline_resnet20_final_state_dict.pth")
print(" - modelopt_pruned_model_fastnas.pth")
print(" - pruned_resnet20_final_modelopt.pth")
print(" - modelopt_search_checkpoint_fastnas.pth")


@torch.no_grad()
def show_sample_predictions(mannequin, loader, n=8):
   mannequin.eval()
   class_names = [
       "airplane", "automobile", "bird", "cat", "deer",
       "dog", "frog", "horse", "ship", "truck"
   ]
   photographs, labels = subsequent(iter(loader))
   photographs = photographs[:n].to(machine)
   labels = labels[:n]
   logits = mannequin(photographs)
   preds = logits.argmax(dim=1).cpu()


   print("nSample predictions:")
   for i in vary(len(preds)):
       print(f"{i:02d} | pred={class_names[preds[i]]:<10} | true={class_names[labels[i]]}")


show_sample_predictions(restored_pruned_model, test_loader, n=8)

We restore the pruned mannequin and confirm its efficiency to make sure the pruning course of succeeded. We fine-tune the mannequin to get well accuracy misplaced throughout pruning and consider the ultimate efficiency. We conclude by evaluating metrics, saving artifacts, and operating pattern predictions to validate the optimized mannequin end-to-end.

In conclusion, we moved past principle and constructed a whole, production-grade model-optimization pipeline from scratch. We noticed how a dense mannequin is remodeled into an environment friendly, compute-aware community by way of structured pruning, and how fine-tuning restores efficiency whereas retaining effectivity positive aspects. We developed a robust instinct for FLOP constraints, automated structure search, and how FastNAS intelligently navigates the trade-off between accuracy and effectivity. Most importantly, we walked away with a robust, reusable workflow that we are able to apply to any mannequin or dataset, enabling us to systematically design high-performance fashions that aren’t solely correct but in addition really optimized for real-world deployment.


Check out the Full Implementation Coding Notebook.  Also, be happy to observe us on Twitter and don’t overlook to be part of our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

The submit Step by Step Guide to Build an End-to-End Model Optimization Pipeline with NVIDIA Model Optimizer Using FastNAS Pruning and Fine-Tuning appeared first on MarkTechPost.

Similar Posts