Implementing DeepSpeed for Scalable Transformers: Advanced Training with Gradient Checkpointing and Parallelism

In this superior DeepSpeed tutorial, we offer a hands-on walkthrough of cutting-edge optimization methods for coaching massive language fashions effectively. By combining ZeRO optimization, mixed-precision coaching, gradient accumulation, and superior DeepSpeed configurations, the tutorial demonstrates how one can maximize GPU reminiscence utilization, scale back coaching overhead, and allow scaling of transformer fashions in resource-constrained environments, reminiscent of Colab. Alongside mannequin creation and coaching, it additionally covers efficiency monitoring, inference optimization, checkpointing, and benchmarking completely different ZeRO phases, offering practitioners with each theoretical insights and sensible code to speed up mannequin growth. Check out the FULL CODES here.
import subprocess
import sys
import os
import json
import time
from pathlib import Path
def install_dependencies():
"""Install required packages for DeepSpeed in Colab"""
print("
Installing DeepSpeed and dependencies...")
subprocess.check_call([
sys.executable, "-m", "pip", "install",
"torch", "torchvision", "torchaudio", "--index-url",
"https://download.pytorch.org/whl/cu118"
])
subprocess.check_call([sys.executable, "-m", "pip", "install", "deepspeed"])
subprocess.check_call([
sys.executable, "-m", "pip", "install",
"transformers", "datasets", "accelerate", "wandb"
])
print("
Installation full!")
install_dependencies()
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.knowledge import DataLoader, Dataset
import deepspeed
from transformers import GPT2Config, GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
from typing import Dict, Any
import argparse
We arrange our Colab setting by putting in PyTorch with CUDA assist, DeepSpeed, and important libraries like Transformers, Datasets, Accelerate, and Weights & Biases. We guarantee every part is prepared so we are able to easily construct and prepare fashions with DeepSpeed. Check out the FULL CODES here.
class SyntheticTextual contentDataset(Dataset):
"""Synthetic dataset for demonstration functions"""
def __init__(self, measurement: int = 1000, seq_length: int = 512, vocab_size: int = 50257):
self.measurement = measurement
self.seq_length = seq_length
self.vocab_size = vocab_size
self.knowledge = torch.randint(0, vocab_size, (measurement, seq_length))
def __len__(self):
return self.measurement
def __getitem__(self, idx):
return {
'input_ids': self.knowledge[idx],
'labels': self.knowledge[idx].clone()
}
We create a SyntheticTextual contentDataset the place we generate random token sequences to imitate actual textual content knowledge. We use these sequences as each inputs and labels, permitting us to rapidly check DeepSpeed coaching with out counting on a big exterior dataset. Check out the FULL CODES here.
class AdvancedDeepSpeedTrainer:
"""Advanced DeepSpeed coach with a number of optimization methods"""
def __init__(self, model_config: Dict[str, Any], ds_config: Dict[str, Any]):
self.model_config = model_config
self.ds_config = ds_config
self.mannequin = None
self.engine = None
self.tokenizer = None
def create_model(self):
"""Create a GPT-2 type mannequin for demonstration"""
print("
Creating mannequin...")
config = GPT2Config(
vocab_size=self.model_config['vocab_size'],
n_positions=self.model_config['seq_length'],
n_embd=self.model_config['hidden_size'],
n_layer=self.model_config['num_layers'],
n_head=self.model_config['num_heads'],
resid_pdrop=0.1,
embd_pdrop=0.1,
attn_pdrop=0.1,
)
self.mannequin = GPT2LMHeadModel(config)
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
self.tokenizer.pad_token = self.tokenizer.eos_token
print(f"
Model parameters: {sum(p.numel() for p in self.mannequin.parameters()):,}")
return self.mannequin
def create_deepspeed_config(self):
"""Create complete DeepSpeed configuration"""
return {
"train_batch_size": self.ds_config['train_batch_size'],
"train_micro_batch_size_per_gpu": self.ds_config['micro_batch_size'],
"gradient_accumulation_steps": self.ds_config['gradient_accumulation_steps'],
"zero_optimization": {
"stage": self.ds_config['zero_stage'],
"allgather_partitions": True,
"allgather_bucket_size": 5e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 5e8,
"contiguous_gradients": True,
"cpu_offload": self.ds_config.get('cpu_offload', False)
},
"fp16": {
"enabled": True,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"optimizer": {
"sort": "AdamW",
"params": {
"lr": self.ds_config['learning_rate'],
"betas": [0.9, 0.999],
"eps": 1e-8,
"weight_decay": 0.01
}
},
"scheduler": {
"sort": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": self.ds_config['learning_rate'],
"warmup_num_steps": 100
}
},
"gradient_clipping": 1.0,
"wall_clock_breakdown": True,
"memory_breakdown": True,
"tensorboard": {
"enabled": True,
"output_path": "./logs/",
"job_name": "deepspeed_advanced_tutorial"
}
}
def initialize_deepspeed(self):
"""Initialize DeepSpeed engine"""
print("
Initializing DeepSpeed...")
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', sort=int, default=0)
args = parser.parse_args([])
self.engine, optimizer, _, lr_scheduler = deepspeed.initialize(
args=args,
mannequin=self.mannequin,
config=self.create_deepspeed_config()
)
print(f"
DeepSpeed engine initialized with ZeRO stage {self.ds_config['zero_stage']}")
return self.engine
def train_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
"""Perform a single coaching step with DeepSpeed optimizations"""
input_ids = batch['input_ids'].to(self.engine.gadget)
labels = batch['labels'].to(self.engine.gadget)
outputs = self.engine(input_ids=input_ids, labels=labels)
loss = outputs.loss
self.engine.backward(loss)
self.engine.step()
return {
'loss': loss.merchandise(),
'lr': self.engine.lr_scheduler.get_last_lr()[0] if self.engine.lr_scheduler else 0
}
def prepare(self, dataloader: DataLoader, num_epochs: int = 2):
"""Complete coaching loop with monitoring"""
print(f"
Starting coaching for {num_epochs} epochs...")
self.engine.prepare()
total_steps = 0
for epoch in vary(num_epochs):
epoch_loss = 0.0
epoch_steps = 0
print(f"n
Epoch {epoch + 1}/{num_epochs}")
for step, batch in enumerate(dataloader):
start_time = time.time()
metrics = self.train_step(batch)
epoch_loss += metrics['loss']
epoch_steps += 1
total_steps += 1
if step % 10 == 0:
step_time = time.time() - start_time
print(f" Step {step:4d} | Loss: {metrics['loss']:.4f} | "
f"LR: {metrics['lr']:.2e} | Time: {step_time:.3f}s")
if step % 20 == 0 and hasattr(self.engine, 'monitor'):
self.log_memory_stats()
if step >= 50:
break
avg_loss = epoch_loss / epoch_steps
print(f"
Epoch {epoch + 1} accomplished | Average Loss: {avg_loss:.4f}")
print("
Training accomplished!")
def log_memory_stats(self):
"""Log GPU reminiscence statistics"""
if torch.cuda.is_available():
allotted = torch.cuda.memory_allocated() / 1024**3
reserved = torch.cuda.memory_reserved() / 1024**3
print(f"
GPU Memory - Allocated: {allotted:.2f}GB | Reserved: {reserved:.2f}GB")
def save_checkpoint(self, path: str):
"""Save mannequin checkpoint utilizing DeepSpeed"""
print(f"
Saving checkpoint to {path}")
self.engine.save_checkpoint(path)
def demonstrate_inference(self, textual content: str = "The way forward for AI is"):
"""Demonstrate optimized inference with DeepSpeed"""
print(f"n
Running inference with immediate: '{textual content}'")
inputs = self.tokenizer.encode(textual content, return_tensors='pt').to(self.engine.gadget)
self.engine.eval()
with torch.no_grad():
outputs = self.engine.module.generate(
inputs,
max_length=inputs.form[1] + 50,
num_return_sequences=1,
temperature=0.8,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"
Generated textual content: {generated_text}")
self.engine.prepare()
We construct an end-to-end coach that creates a GPT-2 mannequin, units a DeepSpeed config (ZeRO, FP16, AdamW, warmup scheduler, tensorboard), and initializes the engine. We then run environment friendly coaching steps with logging and reminiscence statistics, save checkpoints, and display inference to confirm optimization and era in a single place. Check out the FULL CODES here.
def run_advanced_tutorial():
"""Main perform to run the superior DeepSpeed tutorial"""
print("
Advanced DeepSpeed Tutorial Starting...")
print("=" * 60)
model_config = {
'vocab_size': 50257,
'seq_length': 512,
'hidden_size': 768,
'num_layers': 6,
'num_heads': 12
}
ds_config = {
'train_batch_size': 16,
'micro_batch_size': 4,
'gradient_accumulation_steps': 4,
'zero_stage': 2,
'learning_rate': 1e-4,
'cpu_offload': False
}
print("
Configuration:")
print(f" Model measurement: ~{sum(np.prod(form) for form in [[model_config['vocab_size'], model_config['hidden_size']], [model_config['hidden_size'], model_config['hidden_size']] * model_config['num_layers']]) / 1e6:.1f}M parameters")
print(f" ZeRO Stage: {ds_config['zero_stage']}")
print(f" Batch measurement: {ds_config['train_batch_size']}")
coach = AdvancedDeepSpeedTrainer(model_config, ds_config)
mannequin = coach.create_model()
engine = coach.initialize_deepspeed()
print("n
Creating artificial dataset...")
dataset = SyntheticTextual contentDataset(
measurement=200,
seq_length=model_config['seq_length'],
vocab_size=model_config['vocab_size']
)
dataloader = DataLoader(
dataset,
batch_size=ds_config['micro_batch_size'],
shuffle=True
)
print("n
Pre-training reminiscence stats:")
coach.log_memory_stats()
coach.prepare(dataloader, num_epochs=2)
print("n
Post-training reminiscence stats:")
coach.log_memory_stats()
coach.demonstrate_inference("DeepSpeed permits environment friendly coaching of")
checkpoint_path = "./deepspeed_checkpoint"
coach.save_checkpoint(checkpoint_path)
demonstrate_zero_stages()
demonstrate_memory_optimization()
print("n
Tutorial accomplished efficiently!")
print("Key DeepSpeed options demonstrated:")
print("
ZeRO optimization for reminiscence effectivity")
print("
Mixed precision coaching (FP16)")
print("
Gradient accumulation")
print("
Learning fee scheduling")
print("
Checkpoint saving/loading")
print("
Memory monitoring")
def demonstrate_zero_stages():
"""Demonstrate completely different ZeRO optimization phases"""
print("n
ZeRO Optimization Stages Explained:")
print(" Stage 0: Disabled (baseline)")
print(" Stage 1: Optimizer state partitioning (~4x reminiscence discount)")
print(" Stage 2: Gradient partitioning (~8x reminiscence discount)")
print(" Stage 3: Parameter partitioning (~Nx reminiscence discount)")
zero_configs = {
1: {"stage": 1, "reduce_bucket_size": 5e8},
2: {"stage": 2, "allgather_partitions": True, "reduce_scatter": True},
3: {"stage": 3, "stage3_prefetch_bucket_size": 5e8, "stage3_param_persistence_threshold": 1e6}
}
for stage, config in zero_configs.objects():
estimated_memory_reduction = [1, 4, 8, "Nx"][stage]
print(f"
Stage {stage}: ~{estimated_memory_reduction}x reminiscence discount")
def demonstrate_memory_optimization():
"""Show reminiscence optimization methods"""
print("n
Memory Optimization Techniques:")
print("
Gradient Checkpointing: Trade compute for reminiscence")
print("
CPU Offloading: Move optimizer states to CPU")
print("
Compression: Reduce communication overhead")
print("
Mixed Precision: Use FP16 for sooner coaching")
We orchestrate the complete coaching run: set configs, construct the GPT-2 mannequin and DeepSpeed engine, create an artificial dataset, monitor GPU reminiscence, prepare for two epochs, run inference, and save a checkpoint. We then clarify ZeRO phases and spotlight memory-optimization ways, reminiscent of gradient checkpointing and CPU offloading, to grasp the trade-offs in apply. Check out the FULL CODES here.
class DeepSpeedConfigGenerator:
"""Utility class to generate DeepSpeed configurations"""
@staticmethod
def generate_config(
batch_size: int = 16,
zero_stage: int = 2,
use_cpu_offload: bool = False,
learning_rate: float = 1e-4
) -> Dict[str, Any]:
"""Generate an entire DeepSpeed configuration"""
config = {
"train_batch_size": batch_size,
"train_micro_batch_size_per_gpu": max(1, batch_size // 4),
"gradient_accumulation_steps": max(1, batch_size // max(1, batch_size // 4)),
"zero_optimization": {
"stage": zero_stage,
"allgather_partitions": True,
"allgather_bucket_size": 5e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 5e8,
"contiguous_gradients": True
},
"fp16": {
"enabled": True,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"optimizer": {
"sort": "AdamW",
"params": {
"lr": learning_rate,
"betas": [0.9, 0.999],
"eps": 1e-8,
"weight_decay": 0.01
}
},
"scheduler": {
"sort": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": learning_rate,
"warmup_num_steps": 100
}
},
"gradient_clipping": 1.0,
"wall_clock_breakdown": True
}
if use_cpu_offload:
config["zero_optimization"]["cpu_offload"] = True
config["zero_optimization"]["pin_memory"] = True
if zero_stage == 3:
config["zero_optimization"].replace({
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6,
"stage3_gather_16bit_weights_on_model_save": True
})
return config
def benchmark_zero_stages():
"""Benchmark completely different ZeRO phases"""
print("n
Benchmarking ZeRO Stages...")
model_config = {
'vocab_size': 50257,
'seq_length': 256,
'hidden_size': 512,
'num_layers': 4,
'num_heads': 8
}
outcomes = {}
for stage in [1, 2]:
print(f"n
Testing ZeRO Stage {stage}...")
ds_config = {
'train_batch_size': 8,
'micro_batch_size': 2,
'gradient_accumulation_steps': 4,
'zero_stage': stage,
'learning_rate': 1e-4
}
attempt:
coach = AdvancedDeepSpeedTrainer(model_config, ds_config)
mannequin = coach.create_model()
engine = coach.initialize_deepspeed()
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
dataset = SyntheticTextual contentDataset(measurement=20, seq_length=model_config['seq_length'])
dataloader = DataLoader(dataset, batch_size=ds_config['micro_batch_size'])
start_time = time.time()
for i, batch in enumerate(dataloader):
if i >= 5:
break
coach.train_step(batch)
end_time = time.time()
peak_memory = torch.cuda.max_memory_allocated() / 1024**3
outcomes[stage] = {
'peak_memory_gb': peak_memory,
'time_per_step': (end_time - start_time) / 5
}
print(f"
Peak Memory: {peak_memory:.2f}GB")
print(f"
Time per step: {outcomes[stage]['time_per_step']:.3f}s")
del coach, mannequin, engine
torch.cuda.empty_cache()
besides Exception as e:
print(f"
Error with stage {stage}: {str(e)}")
if len(outcomes) > 1:
print(f"n
Comparison:")
stage_1_memory = outcomes.get(1, {}).get('peak_memory_gb', 0)
stage_2_memory = outcomes.get(2, {}).get('peak_memory_gb', 0)
if stage_1_memory > 0 and stage_2_memory > 0:
memory_reduction = (stage_1_memory - stage_2_memory) / stage_1_memory * 100
print(f"
Memory discount from Stage 1 to 2: {memory_reduction:.1f}%")
def demonstrate_advanced_features():
"""Demonstrate extra superior DeepSpeed options"""
print("n
Advanced DeepSpeed Features:")
print("
Dynamic Loss Scaling: Automatically adjusts FP16 loss scaling")
print("
Gradient Compression: Reduces communication overhead")
print("
Pipeline Parallelism: Splits mannequin throughout units")
print("
Expert Parallelism: Efficient Mixture-of-Experts coaching")
print("
Curriculum Learning: Progressive coaching methods")
if __name__ == "__main__":
print(f"
CUDA Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f" GPU: {torch.cuda.get_device_name()}")
print(f" Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB")
attempt:
run_advanced_tutorial()
benchmark_zero_stages()
demonstrate_advanced_features()
besides Exception as e:
print(f"
Error throughout tutorial: {str(e)}")
print("
Tips for troubleshooting:")
print(" - Ensure you've gotten GPU runtime enabled in Colab")
print(" - Try decreasing batch_size or mannequin measurement if going through reminiscence points")
print(" - Enable CPU offloading in ds_config if wanted")
We generate reusable DeepSpeed configurations, benchmark ZeRO phases to check reminiscence and velocity, and showcase superior options reminiscent of dynamic loss scaling and pipeline/MoE parallelism. We additionally detect CUDA, run the complete tutorial end-to-end, and present clear troubleshooting suggestions, permitting us to iterate confidently in Colab.
In conclusion, we acquire a complete understanding of how DeepSpeed enhances mannequin coaching effectivity by hanging a stability between efficiency and reminiscence trade-offs. From leveraging ZeRO phases for reminiscence discount to making use of FP16 blended precision and CPU offloading, the tutorial showcases highly effective methods that make large-scale coaching accessible on modest {hardware}. By the top, learners can have skilled and optimized a GPT-style mannequin, benchmarked configurations, monitored GPU assets, and explored superior options reminiscent of pipeline parallelism and gradient compression.
Check out the FULL CODES here. Feel free to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Also, be happy to observe us on Twitter and don’t overlook to affix our 100k+ ML SubReddit and Subscribe to our Newsletter.
The put up Implementing DeepSpeed for Scalable Transformers: Advanced Training with Gradient Checkpointing and Parallelism appeared first on MarkTechPost.