How to Build an Agentic Deep Reinforcement Learning System with Curriculum Progression, Adaptive Exploration, and Meta-Level UCB Planning
In this tutorial, we construct an superior agentic Deep Reinforcement Learning system that guides an agent to study not solely actions inside an surroundings but in addition how to select its personal coaching methods. We design a Dueling Double DQN learner, introduce a curriculum with rising problem, and combine a number of exploration modes that adapt as coaching evolves. Most importantly, we assemble a meta-agent that plans, evaluates, and regulates the whole studying course of, permitting us to expertise how company transforms reinforcement studying right into a self-directed, strategic workflow. Check out the FULL CODES here.
!pip set up -q gymnasium[classic-control] torch matplotlib
import gymnasium as fitness center
import numpy as np
import torch, torch.nn as nn, torch.optim as optim
from collections import deque, defaultdict
import math, random, matplotlib.pyplot as plt
random.seed(0); np.random.seed(0); torch.manual_seed(0)
class DuelingQNet(nn.Module):
def __init__(self, obs_dim, act_dim):
tremendous().__init__()
hidden = 128
self.characteristic = nn.Sequential(
nn.Linear(obs_dim, hidden),
nn.ReLU(),
)
self.value_head = nn.Sequential(
nn.Linear(hidden, hidden),
nn.ReLU(),
nn.Linear(hidden, 1),
)
self.adv_head = nn.Sequential(
nn.Linear(hidden, hidden),
nn.ReLU(),
nn.Linear(hidden, act_dim),
)
def ahead(self, x):
h = self.characteristic(x)
v = self.value_head(h)
a = self.adv_head(h)
return v + (a - a.imply(dim=1, keepdim=True))
class ReplayBuffer:
def __init__(self, capability=100000):
self.buffer = deque(maxlen=capability)
def push(self, s,a,r,ns,d):
self.buffer.append((s,a,r,ns,d))
def pattern(self, batch_size):
batch = random.pattern(self.buffer, batch_size)
s,a,r,ns,d = zip(*batch)
def to_t(x, dt): return torch.tensor(x, dtype=dt, system=system)
return to_t(s,torch.float32), to_t(a,torch.lengthy), to_t(r,torch.float32), to_t(ns,torch.float32), to_t(d,torch.float32)
def __len__(self): return len(self.buffer)
We arrange the core construction of our deep reinforcement studying system. We initialize the surroundings, create the dueling Q-network, and put together the replay buffer to retailer transitions effectively. As we set up these foundations, we put together all the things our agent wants to start studying. Check out the FULL CODES here.
class DQNAgent:
def __init__(self, obs_dim, act_dim, gamma=0.99, lr=1e-3, batch_size=64):
self.q = DuelingQNet(obs_dim, act_dim).to(system)
self.tgt = DuelingQNet(obs_dim, act_dim).to(system)
self.tgt.load_state_dict(self.q.state_dict())
self.buf = ReplayBuffer()
self.decide = optim.Adam(self.q.parameters(), lr=lr)
self.gamma = gamma
self.batch_size = batch_size
self.global_step = 0
def _eps_value(self, step, begin=1.0, finish=0.05, decay=8000):
return finish + (begin - finish) * math.exp(-step/decay)
def select_action(self, state, mode, technique, softmax_temp=1.0):
s = torch.tensor(state, dtype=torch.float32, system=system).unsqueeze(0)
with torch.no_grad():
q_vals = self.q(s).cpu().numpy()[0]
if mode == "eval":
return int(np.argmax(q_vals)), None
if technique == "epsilon":
eps = self._eps_value(self.global_step)
if random.random() < eps:
return random.randrange(len(q_vals)), eps
return int(np.argmax(q_vals)), eps
if technique == "softmax":
logits = q_vals / softmax_temp
p = np.exp(logits - np.max(logits))
p /= p.sum()
return int(np.random.alternative(len(q_vals), p=p)), None
return int(np.argmax(q_vals)), None
def train_step(self):
if len(self.buf) < self.batch_size:
return None
s,a,r,ns,d = self.buf.pattern(self.batch_size)
with torch.no_grad():
next_q_online = self.q(ns)
next_actions = next_q_online.argmax(dim=1, keepdim=True)
next_q_target = self.tgt(ns).collect(1, next_actions).squeeze(1)
goal = r + self.gamma * next_q_target * (1 - d)
q_vals = self.q(s).collect(1, a.unsqueeze(1)).squeeze(1)
loss = nn.MSELoss()(q_vals, goal)
self.decide.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.q.parameters(), 1.0)
self.decide.step()
return float(loss.merchandise())
def update_target(self):
self.tgt.load_state_dict(self.q.state_dict())
def run_episodes(self, env, episodes, mode, technique):
returns = []
for _ in vary(episodes):
obs,_ = env.reset()
completed = False
ep_ret = 0.0
whereas not completed:
self.global_step += 1
a,_ = self.select_action(obs, mode, technique)
nobs, r, time period, trunc, _ = env.step(a)
completed = time period or trunc
if mode == "practice":
self.buf.push(obs, a, r, nobs, float(completed))
self.train_step()
obs = nobs
ep_ret += r
returns.append(ep_ret)
return float(np.imply(returns))
def evaluate_across_levels(self, ranges, episodes=5):
scores = {}
for title, max_steps in ranges.gadgets():
env = fitness center.make("CartPole-v1", max_episode_steps=max_steps)
avg = self.run_episodes(env, episodes, mode="eval", technique="epsilon")
env.shut()
scores[name] = avg
return scores
We outline how our agent observes the surroundings, chooses actions, and updates its neural community. We implement Double DQN logic, gradient updates, and exploration methods that allow the agent steadiness studying and discovery. As we end this snippet, we equip our agent with its full low-level studying capabilities. Check out the FULL CODES here.
class MetaAgent:
def __init__(self, agent):
self.agent = agent
self.ranges = {
"EASY": 100,
"MEDIUM": 300,
"HARD": 500,
}
self.plans = []
for diff in self.ranges.keys():
for mode in ["train", "eval"]:
for expl in ["epsilon", "softmax"]:
self.plans.append((diff, mode, expl))
self.counts = defaultdict(int)
self.values = defaultdict(float)
self.t = 0
self.historical past = []
def _ucb_score(self, plan, c=2.0):
n = self.counts[plan]
if n == 0:
return float("inf")
return self.values[plan] + c * math.sqrt(math.log(self.t+1) / n)
def select_plan(self):
self.t += 1
scores = [self._ucb_score(p) for p in self.plans]
return self.plans[int(np.argmax(scores))]
def make_env(self, diff):
max_steps = self.ranges[diff]
return fitness center.make("CartPole-v1", max_episode_steps=max_steps)
def meta_reward_fn(self, diff, mode, avg_return):
r = avg_return
if diff == "MEDIUM": r += 20
if diff == "HARD": r += 50
if mode == "eval" and diff == "HARD": r += 50
return r
def update_plan_value(self, plan, meta_reward):
self.counts[plan] += 1
n = self.counts[plan]
mu = self.values[plan]
self.values[plan] = mu + (meta_reward - mu) / n
def run(self, meta_rounds=30):
eval_log = {"EASY":[], "MEDIUM":[], "HARD":[]}
for okay in vary(1, meta_rounds+1):
diff, mode, expl = self.select_plan()
env = self.make_env(diff)
avg_ret = self.agent.run_episodes(env, 5 if mode=="practice" else 3, mode, expl if mode=="practice" else "epsilon")
env.shut()
if okay % 3 == 0:
self.agent.update_target()
meta_r = self.meta_reward_fn(diff, mode, avg_ret)
self.update_plan_value((diff,mode,expl), meta_r)
self.historical past.append((okay, diff, mode, expl, avg_ret, meta_r))
if mode == "eval":
eval_log[diff].append((okay, avg_ret))
print(f"{okay} {diff} {mode} {expl} {avg_ret:.1f} {meta_r:.1f}")
return eval_log
We design the agentic layer that decides how the agent ought to practice. We use a UCB bandit to choose problem ranges, modes, and exploration types primarily based on previous efficiency. As we repeatedly run these decisions, we observe the meta-agent strategically guiding the whole coaching course of. Check out the FULL CODES here.
tmp_env = fitness center.make("CartPole-v1", max_episode_steps=100)
obs_dim, act_dim = tmp_env.observation_space.form[0], tmp_env.action_space.n
tmp_env.shut()
agent = DQNAgent(obs_dim, act_dim)
meta = MetaAgent(agent)
eval_log = meta.run(meta_rounds=36)
final_scores = agent.evaluate_across_levels(meta.ranges, episodes=10)
print("Final Evaluation")
for okay, v in final_scores.gadgets():
print(okay, v)
We deliver all the things collectively by launching meta-rounds the place the meta-agent selects plans and the DQN agent executes them. We monitor how efficiency evolves and how the agent adapts to more and more troublesome duties. As this snippet runs, we see the emergence of long-horizon self-directed studying. Check out the FULL CODES here.
plt.determine(figsize=(9,4))
for diff, colour in [("EASY","tab:blue"), ("MEDIUM","tab:orange"), ("HARD","tab:red")]:
if eval_log[diff]:
x, y = zip(*eval_log[diff])
plt.plot(x, y, marker="o", label=f"{diff}")
plt.xlabel("Meta-Round")
plt.ylabel("Avg Return")
plt.title("Agentic Meta-Control Evaluation")
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.present()
We visualize how the agent performs throughout Easy, Medium, and Hard duties over time. We observe studying developments, enhancements, and the results of agentic planning mirrored within the curves. As we analyze these plots, we acquire perception into how strategic selections form the agent’s general progress.
In conclusion, we observe our agent evolve right into a system that learns on a number of ranges, refining its insurance policies, adjusting its exploration, and strategically choosing how to practice itself. We observe the meta-agent refine its selections by way of UCB-based planning and information the low-level learner towards more difficult duties and improved stability. With a deeper understanding of how agentic constructions amplify reinforcement studying, we will create techniques that plan, adapt, and optimize their very own enchancment over time.
Check out the FULL CODES here. Feel free to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Also, be at liberty to observe us on Twitter and don’t overlook to be a part of our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The put up How to Build an Agentic Deep Reinforcement Learning System with Curriculum Progression, Adaptive Exploration, and Meta-Level UCB Planning appeared first on MarkTechPost.
