|

How We Learn Step-Level Rewards from Preferences to Solve Sparse-Reward Environments Using Online Process Reward Learning

In this tutorial, we discover Online Process Reward Learning (OPRL) and display how we are able to be taught dense, step-level reward alerts from trajectory preferences to clear up sparse-reward reinforcement studying duties. We stroll by way of every element, from the maze surroundings and reward-model community to choice era, coaching loops, and analysis, whereas observing how the agent steadily improves its behaviour by way of on-line preference-driven shaping. By operating this end-to-end implementation, we acquire a sensible understanding of how OPRL permits higher credit score task, quicker studying, and extra secure coverage optimization in difficult environments the place the agent would in any other case wrestle to uncover significant rewards. Check out the FULL CODE NOTEBOOK.

import numpy as np
import torch
import torch.nn as nn
import torch.nn.practical as F
from torch.optim import Adam
import matplotlib.pyplot as plt
from collections import deque
import random


torch.manual_seed(42)
np.random.seed(42)
random.seed(42)


class MazeEnv:
   def __init__(self, measurement=8):
       self.measurement = measurement
       self.begin = (0, 0)
       self.purpose = (size-1, size-1)
       self.obstacles = set([(i, size//2) for i in range(1, size-2)])
       self.reset()
  
   def reset(self):
       self.pos = self.begin
       self.steps = 0
       return self._get_state()
  
   def _get_state(self):
       state = np.zeros(self.measurement * self.measurement)
       state[self.pos[0] * self.measurement + self.pos[1]] = 1
       return state
  
   def step(self, motion):
       strikes = [(-1,0), (0,1), (1,0), (0,-1)]
       new_pos = (self.pos[0] + strikes[action][0],
                  self.pos[1] + strikes[action][1])
       if (0 <= new_pos[0] < self.measurement and
           0 <= new_pos[1] < self.measurement and
           new_pos not in self.obstacles):
           self.pos = new_pos
       self.steps += 1
       achieved = self.pos == self.purpose or self.steps >= 60
       reward = 10.0 if self.pos == self.purpose else 0.0
       return self._get_state(), reward, achieved
  
   def render(self):
       grid = [['.' for _ in range(self.size)] for _ in vary(self.measurement)]
       for obs in self.obstacles:
           grid[obs[0]][obs[1]] = '█'
       grid[self.goal[0]][self.goal[1]] = 'G'
       grid[self.pos[0]][self.pos[1]] = 'A'
       return 'n'.be part of([''.join(row) for row in grid])


class ProcessRewardModel(nn.Module):
   def __init__(self, state_dim, hidden=128):
       tremendous().__init__()
       self.web = nn.Sequential(
           nn.Linear(state_dim, hidden),
           nn.LayerNorm(hidden),
           nn.ReLU(),
           nn.Linear(hidden, hidden),
           nn.LayerNorm(hidden),
           nn.ReLU(),
           nn.Linear(hidden, 1),
           nn.Tanh()
       )
   def ahead(self, states):
       return self.web(states)
   def trajectory_reward(self, states):
       return self.ahead(states).sum()


class PolicyCommunity(nn.Module):
   def __init__(self, state_dim, action_dim, hidden=128):
       tremendous().__init__()
       self.spine = nn.Sequential(
           nn.Linear(state_dim, hidden),
           nn.ReLU(),
           nn.Linear(hidden, hidden),
           nn.ReLU()
       )
       self.actor = nn.Linear(hidden, action_dim)
       self.critic = nn.Linear(hidden, 1)
   def ahead(self, state):
       options = self.spine(state)
       return self.actor(options), self.critic(options)

We arrange all the basis of our OPRL system by importing libraries, defining the maze surroundings, and constructing the reward and coverage networks. We set up how states are represented, how obstacles block motion, and the way the sparse reward construction works. We additionally design the core neural fashions that may later be taught course of rewards and drive the coverage’s choices. Check out the FULL CODE NOTEBOOK.

class OPRLAgent:
   def __init__(self, state_dim, action_dim, lr=3e-4):
       self.coverage = PolicyCommunity(state_dim, action_dim)
       self.reward_model = ProcessRewardModel(state_dim)
       self.policy_opt = Adam(self.coverage.parameters(), lr=lr)
       self.reward_opt = Adam(self.reward_model.parameters(), lr=lr)
       self.trajectories = deque(maxlen=200)
       self.preferences = deque(maxlen=500)
       self.action_dim = action_dim
  
   def select_action(self, state, epsilon=0.1):
       if random.random() < epsilon:
           return random.randint(0, self.action_dim - 1)
       state_t = torch.FloatTensor(state).unsqueeze(0)
       with torch.no_grad():
           logits, _ = self.coverage(state_t)
           probs = F.softmax(logits, dim=-1)
           return torch.multinomial(probs, 1).merchandise()
  
   def collect_trajectory(self, env, epsilon=0.1):
       states, actions, rewards = [], [], []
       state = env.reset()
       achieved = False
       whereas not achieved:
           motion = self.select_action(state, epsilon)
           next_state, reward, achieved = env.step(motion)
           states.append(state)
           actions.append(motion)
           rewards.append(reward)
           state = next_state
       traj = {
           'states': torch.FloatTensor(np.array(states)),
           'actions': torch.LongTensor(actions),
           'rewards': torch.FloatTensor(rewards),
           'return': float(sum(rewards))
       }
       self.trajectories.append(traj)
       return traj

We start developing the OPRL agent by implementing motion choice and trajectory assortment. We use an ε-greedy technique to guarantee exploration and collect sequences of states, actions, and returns. As we run the agent by way of the maze, we retailer total trajectories that may later function choice information for shaping the reward mannequin. Check out the FULL CODE NOTEBOOK.

  def generate_preference(self):
       if len(self.trajectories) < 2:
           return
       t1, t2 = random.pattern(record(self.trajectories), 2)
       label = 1.0 if t1['return'] > t2['return'] else 0.0
       self.preferences.append({'t1': t1, 't2': t2, 'label': label})
  
   def train_reward_model(self, n_updates=5):
       if len(self.preferences) < 32:
           return 0.0
       total_loss = 0.0
       for _ in vary(n_updates):
           batch = random.pattern(record(self.preferences), 32)
           loss = 0.0
           for merchandise in batch:
               r1 = self.reward_model.trajectory_reward(merchandise['t1']['states'])
               r2 = self.reward_model.trajectory_reward(merchandise['t2']['states'])
               logit = r1 - r2
               pred_prob = torch.sigmoid(logit)
               label = merchandise['label']
               loss += -(label * torch.log(pred_prob + 1e-8) +
                        (1-label) * torch.log(1 - pred_prob + 1e-8))
           loss = loss / len(batch)
           self.reward_opt.zero_grad()
           loss.backward()
           torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
           self.reward_opt.step()
           total_loss += loss.merchandise()
       return total_loss / n_updates

We generate choice pairs from collected trajectories and prepare the method reward mannequin utilizing the Bradley–Terry formulation. We evaluate trajectory-level scores, compute chances, and replace the reward mannequin to mirror which behaviours seem higher. This permits us to be taught dense, differentiable, step-level rewards that information the agent even when the surroundings itself is sparse. Check out the FULL CODE NOTEBOOK.

 def train_policy(self, n_updates=3, gamma=0.98):
       if len(self.trajectories) < 5:
           return 0.0
       total_loss = 0.0
       for _ in vary(n_updates):
           traj = random.alternative(record(self.trajectories))
           with torch.no_grad():
               process_rewards = self.reward_model(traj['states']).squeeze()
           shaped_rewards = traj['rewards'] + 0.1 * process_rewards
           returns = []
           G = 0
           for r in reversed(shaped_rewards.tolist()):
               G = r + gamma * G
               returns.insert(0, G)
           returns = torch.FloatTensor(returns)
           returns = (returns - returns.imply()) / (returns.std() + 1e-8)
           logits, values = self.coverage(traj['states'])
           log_probs = F.log_softmax(logits, dim=-1)
           action_log_probs = log_probs.collect(1, traj['actions'].unsqueeze(1))
           benefits = returns - values.squeeze().detach()
           policy_loss = -(action_log_probs.squeeze() * benefits).imply()
           value_loss = F.mse_loss(values.squeeze(), returns)
           entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).imply()
           loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
           self.policy_opt.zero_grad()
           loss.backward()
           torch.nn.utils.clip_grad_norm_(self.coverage.parameters(), 1.0)
           self.policy_opt.step()
           total_loss += loss.merchandise()
       return total_loss / n_updates


def train_oprl(episodes=500, render_interval=100):
   env = MazeEnv(measurement=8)
   agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4)
   returns, reward_losses, policy_losses = [], [], []
   success_rate = []
   for ep in vary(episodes):
       epsilon = max(0.05, 0.5 - ep / 1000)
       traj = agent.collect_trajectory(env, epsilon)
       returns.append(traj['return'])
       if ep % 2 == 0 and ep > 10:
           agent.generate_preference()
       if ep > 20 and ep % 2 == 0:
           rew_loss = agent.train_reward_model(n_updates=3)
           reward_losses.append(rew_loss)
       if ep > 10:
           pol_loss = agent.train_policy(n_updates=2)
           policy_losses.append(pol_loss)
       success = 1 if traj['return'] > 5 else 0
       success_rate.append(success)
       if ep % render_interval == 0 and ep > 0:
           test_env = MazeEnv(measurement=8)
           agent.collect_trajectory(test_env, epsilon=0)
           print(test_env.render())
   return returns, reward_losses, policy_losses, success_rate

We prepare the coverage utilizing formed rewards produced by the discovered course of reward mannequin. We compute returns, benefits, worth estimates, and entropy bonuses, enabling the agent to enhance its technique over time. We then construct a full coaching loop wherein exploration decays, preferences accumulate, and each the reward mannequin and the coverage are up to date constantly. Check out the FULL CODE NOTEBOOK.

print("Training OPRL Agent on Sparse Reward Maze...n")
returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250)


fig, axes = plt.subplots(2, 2, figsize=(14, 10))


axes[0,0].plot(returns, alpha=0.3)
axes[0,0].plot(np.convolve(returns, np.ones(20)/20, mode='legitimate'), linewidth=2)
axes[0,0].set_xlabel('Episode')
axes[0,0].set_ylabel('Return')
axes[0,0].set_title('Agent Performance')
axes[0,0].grid(alpha=0.3)


success_smooth = np.convolve(success, np.ones(20)/20, mode='legitimate')
axes[0,1].plot(success_smooth, linewidth=2, colour='inexperienced')
axes[0,1].set_xlabel('Episode')
axes[0,1].set_ylabel('Success Rate')
axes[0,1].set_title('Goal Success Rate')
axes[0,1].grid(alpha=0.3)


axes[1,0].plot(rew_losses, linewidth=2, colour='orange')
axes[1,0].set_xlabel('Update Step')
axes[1,0].set_ylabel('Loss')
axes[1,0].set_title('Reward Model Loss')
axes[1,0].grid(alpha=0.3)


axes[1,1].plot(pol_losses, linewidth=2, colour='pink')
axes[1,1].set_xlabel('Update Step')
axes[1,1].set_ylabel('Loss')
axes[1,1].set_title('Policy Loss')
axes[1,1].grid(alpha=0.3)


plt.tight_layout()
plt.present()


print("OPRL Training Complete!")
print("Process rewards, choice studying, reward shaping, and on-line updates demonstrated.")

We visualize the training dynamics by plotting returns, success charges, reward-model loss, and coverage loss. We monitor how the agent’s efficiency evolves as OPRL shapes the reward panorama. By the top of the visualization, we clearly see the influence of course of rewards on fixing a difficult, sparse-reward maze.

In conclusion, we see how OPRL transforms sparse terminal outcomes into wealthy on-line suggestions that constantly guides the agent’s behaviour. We watch the method reward mannequin be taught preferences, form the return sign, and speed up the coverage’s capability to attain the purpose. With bigger mazes, various shaping strengths, and even actual human choice suggestions, we respect how OPRL offers a versatile and highly effective framework for credit score task in complicated decision-making duties. We end with a transparent, hands-on understanding of how OPRL operates and the way we are able to prolong it to extra superior agentic RL settings.


Check out the FULL CODE NOTEBOOK and Paper. Feel free to try our GitHub Page for Tutorials, Codes and Notebooks. Also, be happy to comply with us on Twitter and don’t overlook to be part of our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

The publish How We Learn Step-Level Rewards from Preferences to Solve Sparse-Reward Environments Using Online Process Reward Learning appeared first on MarkTechPost.

Similar Posts