TP 3 - DQN#

Ce TP est disponible dans Google Colab ici,

ou peut être téléchargé comme notebook ici

Dans ce TP nous allons nous interesser au Deep Q Networks et leur implémentation pour une tâche classique de RL : le cartpole. Il s’agit d’apprendre à un agent à faire tenir un mât en équilibre sur un véhicule dans une scène 2D. L’agent à deux action possibles : gauche ou droite et reçoit une reward négative lorque le mat tombre de la voiture. Pour cela, nous allons utiliser la bibliothèque gym proposant toute sortes d’environnements et de benchmarks.

Commençons par charger quelques utilitaires

#remove " > /dev/null 2>&1" to see what is going on under the hood
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
import os

import random
import gym
import pylab
import numpy as np
from collections import deque
from gym.wrappers import Monitor
from IPython.display import HTML
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

import time
from matplotlib import pyplot as plt

import pylab as pl
from IPython import display as ipdisplay
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

PArtie 1 : DQN#

La classe DQN Agent gère l’essentiel des intéraction avec Gym et nous permettra d’avoir une vision abstraite de ce problème et de nous concentrer sur les modèles ainsi que sur leur utilisation.link text

#@title Class DQNAgent

class DQNAgent(object):
    def __init__(self, env_name, model, replay1d, update_target_model ,ddqn=False,Soft_Update=False, dueling=False,batch_size=128,gamma=0.95,memory_size=1e4):

        self.env_name = env_name
        self.env = self.wrap_env(gym.make(env_name))
        self.env.seed(0)
        self.replay1d = replay1d
        self.update_target_model = update_target_model

        # by default, CartPole-v1 has max episode steps = 500
        # we can use this to experiment beyond 500
        self.env._max_episode_steps = 4000
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.EPISODES = 2000

        # Instantiate memory
        memory_size = 10000
        #self.MEMORY = Memory(memory_size)
        self.memory = deque(maxlen=65536)

        self.gamma = 0.95    # discount rate

        # EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
        self.epsilon = 1.0  # exploration probability at start
        self.epsilon_min = 0.01  # minimum exploration probability
        self.epsilon_decay = 0.0005  # exponential decay rate for exploration prob

        self.batch_size = batch_size

        # defining model parameters
        self.ddqn = ddqn # use doudle deep q network
        self.Soft_Update = Soft_Update # use soft parameter update
        self.dueling = dueling
        self.epsilon_greedy = False # use epsilon greedy improved strategy

        self.TAU = .5 # target network soft update hyperparameter

        self.Save_Path = 'Models'
        if not os.path.exists(self.Save_Path): os.makedirs(self.Save_Path)
        self.scores, self.episodes, self.average = [], [], []

        self.Model_name = os.path.join(self.Save_Path, self.env_name+"_PER_D3QN_CNN.h5")

        # create main model and target model
        self.model = model(in_channels=4,action_space = self.action_size, dueling = False, name = 'online model').to(device)
        self.target_model = model(in_channels=4,action_space = self.action_size, dueling = False, name = 'target model').to(device)

        self.optimizer = optim.RMSprop(self.model.parameters(),lr=0.00025, eps=0.01, alpha=0.95)
        self.criterion = nn.MSELoss()
        self.loss = 0
        self.dd = torch.zeros(64, 4, 5, 5).to(device)

    def wrap_env(self,env):
        env = Monitor(env, './video', force=True)
        return env

    # after some time interval update the target model to be the same as the online model



    def remember(self, state, action, reward, next_state, done):
        experience = state, action, reward, next_state, done
        #print(f' state = {state}, action = {action}, reward = {reward}, next_state = {next_state}, done = {done}  ')
        self.memory.append((experience))

    def act1d(self, state, decay_step):
        # EPSILON GREEDY STRATEGY
        if self.epsilon_greedy:
        # Here we'll use an improved version of our epsilon greedy strategy for Q-learning
            explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
        # OLD EPSILON STRATEGY
        else:
            if self.epsilon > self.epsilon_min:
                self.epsilon *= (1-self.epsilon_decay)
            explore_probability = self.epsilon

        if explore_probability > np.random.rand():
            # Make a random action (exploration)
            q = random.randrange(self.action_size)
            return int(np.argmax(q)), explore_probability

        else:
            # Get action from Q-network (exploitation)
            # Estimate the Qs values state
            # Take the biggest Q value (= the best action)
            q = self.model(torch.from_numpy(state).float().unsqueeze(0).to(device))
            return int(torch.argmax(q)), explore_probability




    pylab.figure(figsize=(18, 9))
    def PlotModel(self, score, episode):
        self.scores.append(score)
        self.episodes.append(episode)
        self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:]))
        pylab.plot(self.episodes, self.average, 'r')
        pylab.plot(self.episodes, self.scores, 'b')
        pylab.ylabel('Score', fontsize=18)
        pylab.xlabel('Steps', fontsize=18)
        dqn = 'DQN_'
        softupdate = ''
        dueling = ''
        greedy = ''
        PER = ''
        if self.ddqn: dqn = 'DDQN_'
        if self.Soft_Update: softupdate = '_soft'
        if self.dueling: dueling = '_Dueling'
        if self.epsilon_greedy: greedy = '_Greedy'
        try:
            pylab.savefig(dqn+self.env_name+softupdate+dueling+greedy+PER+"_CNN.png")
        except OSError:
            pass

        return str(self.average[-1])[:5]


    def reset1d(self):
        self.env.reset()
        return np.array([0,0,0,0])


    def step1d(self,action):
        next_state, reward, done, info = self.env.step(action)
        return next_state, reward, done, info


    def run1d(self):
        decay_step = 0
        loss_tab = []
        dd = 0
        dd_old = 0
        for e in range(self.EPISODES):
            state = self.reset1d()
            done = False
            i = 0
            while not done:
                decay_step += 1
                action, explore_probability = self.act1d(state, decay_step)
                next_state, reward, done, _ = self.step1d(action)

                if not done or i == self.env._max_episode_steps-1:
                    reward = reward
                else:
                    reward = -1e2
                self.remember(state, action, reward, next_state, done)
                state = next_state
                i += 1
                if done:
                    if e % 10==0:
                        update_target_model(self.Soft_Update,self.ddqn,self.model,self.target_model,self.TAU)
                    average = self.PlotModel(i, e)
                    loss_tab.append(self.loss)
                    print("episode: {}/{}, score: {}, e: {:.2}, average: {}\n".format(e, self.EPISODES, i, explore_probability, average))
                    if i == self.env._max_episode_steps:
                        print("Saving trained model to", self.Model_name)
                        break
                self.replay1d(self.memory,self.batch_size,self.model,self.target_model,self.ddqn,self.gamma,self.criterion,self.loss,self.optimizer)
        plt.plot(loss_tab)

Pour commencer nous allons construire notre réseaux chargé d’approcher la fonction de valeur état-action \(Q(s,a)\).

Coding task#

Compléter le code et implémenter le DQN suivant:

  • deux couches linéaires de dimensions (in_channels x 512), (512 x 256) et(256 x 64)

  • Une couche linéaire de taille (64 x action_space)

  • Activation ReLU après chaque couche sauf la dernière

class DQN1d(nn.Module):

    def __init__(self, action_space, in_channels, dueling=false, name='NN'):
        super(DQN1d, self).__init__()
        self.name = name
        self.dueling = dueling
        self.action_space = action_space

        ### Your code here
        ##
        ##
        ##


    def forward(self, x):
        ### Your code here
        ##
        ##
        ##


        return x

Définissons ensuite notre fonction d’apprentissage, c’est à dire la fonction permettant d’extraires des minibatch du buffer d’expériences pour permettre l’apprentissage du réseau de nerones

Coding Task#

Compléter le code suivant pour implémenter le replay

def replay1d(memory,batch_size,model,target_model,ddqn,gamma,criterion,loss,optimizer):
        # Randomly sample minibatch from the deque memory
        minibatch = random.sample(memory, min(len(memory), batch_size))

        state = np.zeros([batch_size,4])
        next_state = np.zeros([batch_size,4])
        action, reward, done = [], [], []

        # do this before prediction
        # for speedup, this could be done on the tensor level
        # but easier to understand using a loop
        for i in range(len(minibatch)):
            state[i] = minibatch[i][0]
            action.append(minibatch[i][1])
            reward.append(minibatch[i][2])
            next_state[i] = minibatch[i][3]
            done.append(minibatch[i][4])
        # do batch prediction to save speed
        # predict Q-values for starting state using the main network
        target = model(torch.from_numpy(state).float().to(device)).detach()

        # predict best action in ending state using the main network
        target_next = model(torch.from_numpy(next_state).float().to(device)).detach()

        for i in range(len(minibatch)):
            # correction on the Q value for the action used
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
              ##### Your code here #############################
              ####################################################
              # Standard - DQN
              # DQN chooses the max Q value among next actions
              # selection and evaluation of action is on the target Q Network
              # Q_max = max_a' Q_target(s', a')
              target[i][action[i]] =
              ###################################################



        # Train the Neural Network with batches
        # self.model.fit(state, target, batch_size=self.batch_size, verbose=0)
        pred = model(torch.from_numpy(state).float().to(device))
        loss = criterion(pred, target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

Une dernière brique nécessaire à notre schéma d’apprentissage est de mettre en place la stratégie fixed target

Coding Task#

Compléter le code suivant pour implémenter la mise à jour des paramètres du réseaux cible. On utilisera la méthode .load_state_dict() pour copier les paramètres du réseau online

def update_target_model(Soft_Update=False,ddqn,model,target_model,TAU):
        if not Soft_Update:
            #### Your code here ####
            ##
            ##

            return
        if Soft_Update and ddqn:
            ### Optionel : Weight can be updated as a combinaison of old and new weights

Notre agent peut à présent être entraîné pour sa tâche. Vous trouverez dans le dossier Video, le films correspondant aux derniers épisodes.

env_name = 'CartPole-v1'
agent = DQNAgent(env_name, DQN1d, replay1d,update_target_model)
agent.run1d()

Partie 2 : Duealing DQN#

Dans cette partie nous allons modifier l’architecture du réseau précédent pour permettre de choisir l’approche duelling DQN.

Coding Task#

Compléter le modèle suivant pour construire \(Q\) à partir de la fonction davantage \(A\) et de \(V\)

class DQN1d(nn.Module):

    def __init__(self, action_space, in_channels, dueling=false, name='NN'):
        super(DQN1d, self).__init__()
        self.name = name
        self.dueling = dueling
        self.action_space = action_space

        ## Your code here

        self.head1 = ...
        self.head2 = ...
        self.head3 = ...
        if dueling:
           ## Your code here
           ##
           ##



        else:
            self.Q = nn.Linear(64,action_space)


    def forward(self, x):
        ## Your code here
        ....

        if self.dueling:
          ## Your code here
           ##
           ##

        else:
            x = self.Q(x)

        #print(f'{self.name} output = {x.size()}')
        return x

Notre réseau peut être ensuite utilisé pour entraîner notre agent dans sa tâche.

env_name = 'CartPole-v1'
agent = DQNAgent(env_name, DQN1d, replay1d,update_target_model,dueling=True)
agent.run1d()