TP RL 3 : DQN¶
Ce TP est disponible dans Google Colab ici,
ou peut être téléchargé comme notebook ici
TP RL 3 : DQN¶
Dans ce TP nous allons nous interesser au Deep Q Networks et leur implémentation pour une tâche classique de RL : le cartpole. Il s’agit d’apprendre à un agent à faire tenir un mât en équilibre sur un véhicule dans une scène 2D. L’agent à deux action possibles : gauche ou droite et reçoit une reward négative lorque le mat tombre de la voiture. Pour cela, nous allons utiliser la bibliothèque gym proposant toute sortes d’environnements et de benchmarks. La reward est proportionnelle au temps de l’épisode.
Commençons par charger quelques utilitaires
Risque de ne pas marcher sur jupyterhub –> Passer sur ce colab ou en local en installant les paquets suivants
! pip install gym==0.26.2
! apt-get install xvfb # ne marche pas sur jupyterhub
! pip install gym-notebook-wrapper
import gnwrapper
import gym
Initialisation de l’environnement et première visualisation (optionnel si vous n’utilisez pas JupyterHub)¶
env = gym.make('CartPole-v1')
env = gnwrapper.LoopAnimation(gym.make('CartPole-v1', render_mode="rgb_array"))
obs = env.reset()
for _ in range(100):
next_obs, reward, term, trunc, info = env.step(env.action_space.sample())
env.render()
obs = next_obs
if term or trunc:
obs = env.reset()
env.display()
Chargement des librairies¶
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gym
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
plt.ion()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DQN Model¶
Pour commencer nous allons construire notre réseaux chargé d’approcher la fonction de valeur état-action \(Q(s,a)\).
Coding task¶
Compléter le code et implémenter le DQN suivant:
deux couches linéaires de dimensions (in_channels x 128), (128 x 128)
Une couche linéaire de taille (128 x action_space)
Activation ReLU après chaque couche sauf la dernière
class DQN(nn.Module):
def __init__(self, n_observations, n_actions):
super(DQN, self).__init__()
# Coder ici
# Coder ici
# Coder ici
def forward(self, x):
x = # ...
x = # ...
return # ...
Nous utiliserons la mémoire des expériences pour entraîner notre DQN. Elle stocke les transitions que l’agent observe, ce qui nous permet de réutiliser ces données ultérieurement. En échantillonnant de manière aléatoire dans cette mémoire, les transitions qui constituent un lot sont décorrelées. Il a été démontré que cela stabilise grandement et améliore la procédure d’entraînement du DQN
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
"""Save a transition"""
self.memory.append(Transition(*args))
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
# BATCH_SIZE est le nombre de transitions échantillonnées depuis le tampon (buffer) de rejeu
# GAMMA est le facteur d'actualisation comme mentionné dans la section précédente
# EPS_START est la valeur initiale d'epsilon
# EPS_END est la valeur finale d'epsilon
# EPS_DECAY contrôle le taux de décroissance exponentielle d'epsilon, une valeur plus élevée signifie une décroissance plus lente
# TAU est le taux de mise à jour du réseau cible
# LR est le taux d'apprentissage de l'optimiseur ``AdamW``
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
# Récupère le nombre d'action de l'environnement
n_actions = env.action_space.n
# Récupère le nombre d'observation
state, info = env.reset()
n_observations = len(state)
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
# Politique epsilon greedy
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) retournera la plus grande valeur de colonne pour chaque ligne.
# La deuxième colonne du résultat max est l'indice où l'élément maximum a été
# trouvé, donc nous choisissons l'action avec la récompense attendue la plus élevée.
return policy_net(state).max(1)[1].view(1, 1)
else:
# exploration
return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)
episode_durations = []
def plot_durations(show_result=False):
plt.figure(1)
durations_t = torch.tensor(episode_durations, dtype=torch.float)
if show_result:
plt.title('Result')
else:
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Prend la moyenne de la durée sur 100 épisodes afin de lisser l'affichage
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001)
if is_ipython:
if not show_result:
display.display(plt.gcf())
display.clear_output(wait=True)
else:
display.display(plt.gcf())
Entraînement¶
Etape d’optimisation¶
Coding Task¶
La fonction de coût est calculée comme la différence entre les valeurs Q actuelles (calculées par le réseau de politique) et les valeurs Q attendues. C’est la fonction de Bellman qu’on utilise dans QLearning et TD(0). La fonction de coût utilisée est la SmoothL1Loss, également connue sous le nom de Huber Loss.
Effectuer la mise à jour des paramètres en remettant les gradients à zéro et en rétropropageant le gradient
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
batch = Transition(*zip(*transitions))
# Calculez un masque des états non-finis et concaténez les éléments du batch
# (un état final aurait été celui après lequel la simulation s'est terminée)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Calculez Q(s_t, a) - le modèle calcule Q(s_t), puis nous sélectionnons les
# colonnes des actions effectuées. Ce sont les actions qui auraient été effectuées
# pour chaque état du batch selon policy_net
state_action_values = policy_net(state_batch).gather(1, action_batch)
# Calculez V(s_{t+1}) pour tous les états suivants.
# Les valeurs attendues des actions pour non_final_next_states sont calculées
# sur la base du "plus ancien" target_net ; en sélectionnant leur meilleure récompense avec max(1)[0].
# Ceci est fusionné en fonction du masque, de sorte que nous aurons soit la valeur d'état attendue,
# soit 0 dans le cas où l'état était final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
with torch.no_grad():
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
# Calculez les valeurs Q attendues
expected_state_action_values = # ... # Equation de Bellman
# Calculez la Huber Loss
criterion = nn.SmoothL1Loss()
loss = # ... # Appel à criterion
# Optimisation du modèle
optimizer.zero_grad()
loss.backward()
# gradient-clipping pour éviter saturation du gradient
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
optimizer.step()
Boucle d’apprentissage¶
Coding Task¶
Stocker les transitions en mémoire
Appeler la fonction d’optimisation
if torch.cuda.is_available():
num_episodes = 600
else:
num_episodes = 400
for i_episode in range(num_episodes):
# Initialiser l'environnement et récuperer l'état initial
state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
for t in count():
action = select_action(state)
observation, reward, terminated, truncated, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
done = terminated or truncated
if terminated:
next_state = None
else:
next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
# Stocker les transitions en mémoire
# Coder ici
# Passer à l'état suivant
state = next_state
# Une étape d'optimisation du modèle (fonction à appeler)
# Coder ici
# Mise à jour soft des poids du réseau cible
# θ′ ← τ θ + (1 −τ )θ′
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
target_net.load_state_dict(target_net_state_dict)
if done:
episode_durations.append(t + 1)
plot_durations()
break
print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()
Complete
<Figure size 640x480 with 0 Axes>
<Figure size 640x480 with 0 Axes>
Inférence du modèle et visualisation sur CartPole¶
Coding Task¶
Maintenant que nous avons entraîné notre modèle, nous allons visualiser les performances de notre DQN sur l’environnement Cartpole en sélectionnant les meilleures actions possibles, c’est-à-dire sans utiliser la stratégie epsilon-greedy. - Récupérer les q-values prédits par notre modèle en fonction de l’observation courante - Sélectionner la meilleure action
env = gnwrapper.LoopAnimation(gym.make('CartPole-v1', render_mode="rgb_array"))
obs = env.reset()[0]
for _ in range(100):
observation_tensor = torch.FloatTensor(obs) #Convertir l'observation en tensor
q_values = # ... # Utiliser policy_net
action = torch.argmax(...).item() # Coder iciChoisir l'action avec la Q-value la plus élevée (torch.argmax)
next_obs, reward, term, trunc, info = env.step(action)
env.render()
obs = next_obs
if term or trunc:
obs = env.reset()[0]
env.display()