TP 3 - DQN#
Ce TP est disponible dans Google Colab ici,
ou peut être téléchargé comme notebook ici
Dans ce TP nous allons nous interesser au Deep Q Networks et leur implémentation pour une tâche classique de RL : le cartpole. Il s’agit d’apprendre à un agent à faire tenir un mât en équilibre sur un véhicule dans une scène 2D. L’agent à deux action possibles : gauche ou droite et reçoit une reward négative lorque le mat tombre de la voiture. Pour cela, nous allons utiliser la bibliothèque gym proposant toute sortes d’environnements et de benchmarks.
Commençons par charger quelques utilitaires
#remove " > /dev/null 2>&1" to see what is going on under the hood
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
import os
import random
import gym
import pylab
import numpy as np
from collections import deque
from gym.wrappers import Monitor
from IPython.display import HTML
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()
import time
from matplotlib import pyplot as plt
import pylab as pl
from IPython import display as ipdisplay
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PArtie 1 : DQN#
La classe DQN Agent gère l’essentiel des intéraction avec Gym et nous permettra d’avoir une vision abstraite de ce problème et de nous concentrer sur les modèles ainsi que sur leur utilisation.link text
#@title Class DQNAgent
class DQNAgent(object):
def __init__(self, env_name, model, replay1d, update_target_model ,ddqn=False,Soft_Update=False, dueling=False,batch_size=128,gamma=0.95,memory_size=1e4):
self.env_name = env_name
self.env = self.wrap_env(gym.make(env_name))
self.env.seed(0)
self.replay1d = replay1d
self.update_target_model = update_target_model
# by default, CartPole-v1 has max episode steps = 500
# we can use this to experiment beyond 500
self.env._max_episode_steps = 4000
self.state_size = self.env.observation_space.shape[0]
self.action_size = self.env.action_space.n
self.EPISODES = 2000
# Instantiate memory
memory_size = 10000
#self.MEMORY = Memory(memory_size)
self.memory = deque(maxlen=65536)
self.gamma = 0.95 # discount rate
# EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
self.epsilon = 1.0 # exploration probability at start
self.epsilon_min = 0.01 # minimum exploration probability
self.epsilon_decay = 0.0005 # exponential decay rate for exploration prob
self.batch_size = batch_size
# defining model parameters
self.ddqn = ddqn # use doudle deep q network
self.Soft_Update = Soft_Update # use soft parameter update
self.dueling = dueling
self.epsilon_greedy = False # use epsilon greedy improved strategy
self.TAU = .5 # target network soft update hyperparameter
self.Save_Path = 'Models'
if not os.path.exists(self.Save_Path): os.makedirs(self.Save_Path)
self.scores, self.episodes, self.average = [], [], []
self.Model_name = os.path.join(self.Save_Path, self.env_name+"_PER_D3QN_CNN.h5")
# create main model and target model
self.model = model(in_channels=4,action_space = self.action_size, dueling = False, name = 'online model').to(device)
self.target_model = model(in_channels=4,action_space = self.action_size, dueling = False, name = 'target model').to(device)
self.optimizer = optim.RMSprop(self.model.parameters(),lr=0.00025, eps=0.01, alpha=0.95)
self.criterion = nn.MSELoss()
self.loss = 0
self.dd = torch.zeros(64, 4, 5, 5).to(device)
def wrap_env(self,env):
env = Monitor(env, './video', force=True)
return env
# after some time interval update the target model to be the same as the online model
def remember(self, state, action, reward, next_state, done):
experience = state, action, reward, next_state, done
#print(f' state = {state}, action = {action}, reward = {reward}, next_state = {next_state}, done = {done} ')
self.memory.append((experience))
def act1d(self, state, decay_step):
# EPSILON GREEDY STRATEGY
if self.epsilon_greedy:
# Here we'll use an improved version of our epsilon greedy strategy for Q-learning
explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
# OLD EPSILON STRATEGY
else:
if self.epsilon > self.epsilon_min:
self.epsilon *= (1-self.epsilon_decay)
explore_probability = self.epsilon
if explore_probability > np.random.rand():
# Make a random action (exploration)
q = random.randrange(self.action_size)
return int(np.argmax(q)), explore_probability
else:
# Get action from Q-network (exploitation)
# Estimate the Qs values state
# Take the biggest Q value (= the best action)
q = self.model(torch.from_numpy(state).float().unsqueeze(0).to(device))
return int(torch.argmax(q)), explore_probability
pylab.figure(figsize=(18, 9))
def PlotModel(self, score, episode):
self.scores.append(score)
self.episodes.append(episode)
self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:]))
pylab.plot(self.episodes, self.average, 'r')
pylab.plot(self.episodes, self.scores, 'b')
pylab.ylabel('Score', fontsize=18)
pylab.xlabel('Steps', fontsize=18)
dqn = 'DQN_'
softupdate = ''
dueling = ''
greedy = ''
PER = ''
if self.ddqn: dqn = 'DDQN_'
if self.Soft_Update: softupdate = '_soft'
if self.dueling: dueling = '_Dueling'
if self.epsilon_greedy: greedy = '_Greedy'
try:
pylab.savefig(dqn+self.env_name+softupdate+dueling+greedy+PER+"_CNN.png")
except OSError:
pass
return str(self.average[-1])[:5]
def reset1d(self):
self.env.reset()
return np.array([0,0,0,0])
def step1d(self,action):
next_state, reward, done, info = self.env.step(action)
return next_state, reward, done, info
def run1d(self):
decay_step = 0
loss_tab = []
dd = 0
dd_old = 0
for e in range(self.EPISODES):
state = self.reset1d()
done = False
i = 0
while not done:
decay_step += 1
action, explore_probability = self.act1d(state, decay_step)
next_state, reward, done, _ = self.step1d(action)
if not done or i == self.env._max_episode_steps-1:
reward = reward
else:
reward = -1e2
self.remember(state, action, reward, next_state, done)
state = next_state
i += 1
if done:
if e % 10==0:
update_target_model(self.Soft_Update,self.ddqn,self.model,self.target_model,self.TAU)
average = self.PlotModel(i, e)
loss_tab.append(self.loss)
print("episode: {}/{}, score: {}, e: {:.2}, average: {}\n".format(e, self.EPISODES, i, explore_probability, average))
if i == self.env._max_episode_steps:
print("Saving trained model to", self.Model_name)
break
self.replay1d(self.memory,self.batch_size,self.model,self.target_model,self.ddqn,self.gamma,self.criterion,self.loss,self.optimizer)
plt.plot(loss_tab)
Pour commencer nous allons construire notre réseaux chargé d’approcher la fonction de valeur état-action \(Q(s,a)\).
Coding task#
Compléter le code et implémenter le DQN suivant:
deux couches linéaires de dimensions (in_channels x 512), (512 x 256) et(256 x 64)
Une couche linéaire de taille (64 x action_space)
Activation ReLU après chaque couche sauf la dernière
class DQN1d(nn.Module):
def __init__(self, action_space, in_channels, dueling=false, name='NN'):
super(DQN1d, self).__init__()
self.name = name
self.dueling = dueling
self.action_space = action_space
### Your code here
##
##
##
def forward(self, x):
### Your code here
##
##
##
return x
Définissons ensuite notre fonction d’apprentissage, c’est à dire la fonction permettant d’extraires des minibatch du buffer d’expériences pour permettre l’apprentissage du réseau de nerones
Coding Task#
Compléter le code suivant pour implémenter le replay
def replay1d(memory,batch_size,model,target_model,ddqn,gamma,criterion,loss,optimizer):
# Randomly sample minibatch from the deque memory
minibatch = random.sample(memory, min(len(memory), batch_size))
state = np.zeros([batch_size,4])
next_state = np.zeros([batch_size,4])
action, reward, done = [], [], []
# do this before prediction
# for speedup, this could be done on the tensor level
# but easier to understand using a loop
for i in range(len(minibatch)):
state[i] = minibatch[i][0]
action.append(minibatch[i][1])
reward.append(minibatch[i][2])
next_state[i] = minibatch[i][3]
done.append(minibatch[i][4])
# do batch prediction to save speed
# predict Q-values for starting state using the main network
target = model(torch.from_numpy(state).float().to(device)).detach()
# predict best action in ending state using the main network
target_next = model(torch.from_numpy(next_state).float().to(device)).detach()
for i in range(len(minibatch)):
# correction on the Q value for the action used
if done[i]:
target[i][action[i]] = reward[i]
else:
##### Your code here #############################
####################################################
# Standard - DQN
# DQN chooses the max Q value among next actions
# selection and evaluation of action is on the target Q Network
# Q_max = max_a' Q_target(s', a')
target[i][action[i]] =
###################################################
# Train the Neural Network with batches
# self.model.fit(state, target, batch_size=self.batch_size, verbose=0)
pred = model(torch.from_numpy(state).float().to(device))
loss = criterion(pred, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
Une dernière brique nécessaire à notre schéma d’apprentissage est de mettre en place la stratégie fixed target
Coding Task#
Compléter le code suivant pour implémenter la mise à jour des paramètres
du réseaux cible. On utilisera la méthode .load_state_dict() pour
copier les paramètres du réseau online
def update_target_model(Soft_Update=False,ddqn,model,target_model,TAU):
if not Soft_Update:
#### Your code here ####
##
##
return
if Soft_Update and ddqn:
### Optionel : Weight can be updated as a combinaison of old and new weights
Notre agent peut à présent être entraîné pour sa tâche. Vous trouverez dans le dossier Video, le films correspondant aux derniers épisodes.
env_name = 'CartPole-v1'
agent = DQNAgent(env_name, DQN1d, replay1d,update_target_model)
agent.run1d()
Partie 2 : Duealing DQN#
Dans cette partie nous allons modifier l’architecture du réseau précédent pour permettre de choisir l’approche duelling DQN.
Coding Task#
Compléter le modèle suivant pour construire \(Q\) à partir de la fonction davantage \(A\) et de \(V\)
class DQN1d(nn.Module):
def __init__(self, action_space, in_channels, dueling=false, name='NN'):
super(DQN1d, self).__init__()
self.name = name
self.dueling = dueling
self.action_space = action_space
## Your code here
self.head1 = ...
self.head2 = ...
self.head3 = ...
if dueling:
## Your code here
##
##
else:
self.Q = nn.Linear(64,action_space)
def forward(self, x):
## Your code here
....
if self.dueling:
## Your code here
##
##
else:
x = self.Q(x)
#print(f'{self.name} output = {x.size()}')
return x
Notre réseau peut être ensuite utilisé pour entraîner notre agent dans sa tâche.
env_name = 'CartPole-v1'
agent = DQNAgent(env_name, DQN1d, replay1d,update_target_model,dueling=True)
agent.run1d()