A.I. learns to play Pong with DQN

Original article can be found here (source): Artificial Intelligence on Medium

There’s a huge difference between reading about Reinforcement Learning and actually implementing it. In this tutorial, I’ll implement a Deep Neural Network for Reinforcement Learning (Deep Q Network) and we will see it learns and finally becomes good enough to beat the computer in Pong!

By the end of this post, you’ll be able to do the following:
– Write a Neural Network from scratch.
– Implement a Deep Q Network with Reinforcement Learning.
– Build an A.I. for Pong that can beat the computer in less than 300 lines of Python.
– Use OpenAI gym.

Considering limited time and for learning purposes I am not aiming for a perfect trained agent, but i hope this project could help people get familiar with basic process of DQN algorithms and Keras. The following tutorial took 2 days for 4 different agent to learn on a computer with GPU. To obtain production results, a lot of more training and tuning is required which is not my focus.

Tutorial prerequisites: familiarity with Neural Networks, supervised learning, Tensorflow 1.15, Keras 2.2.4, OpenAI gym.

This tutorial is a part of my previous tutorials, I use my previous tutorial code structure — this way it’s easier to understand line by line when you start learning from first tutorial part. I should mention that in this tutorial there won’t be theory, because we already covered all theory we need in previous tutorials, we will just change several lines of code that our Network would learn to play Pong instead of Cartpole.

1. Open up the code to follow along and copy or clone it.
2. Follow the instructions for installing OpenAI Gym. You may need to install cmake first.
3. Run pip install gym[atari].
4. Let’s get to the next part.

We are given the following problems:
1. A sequence of images (frames) representing each frame of the Pong game.
2. An indication when we’ve won or lost the game.
3. An opponent agent that is the traditional Pong computer player.
4. An agent we control that we can tell to do one step out of 6 at each frame.

Can we use these pieces to train our agent to beat the computer? Moreover, can we make our solution good enough so it could be reused to win in games that aren’t Pong?

Indeed, we can! We’ll do this by building a Neural Network that takes in each image and outputs a command to our A.I. to take correct move.

We can break this down into the following steps:
1. Take in images from the game and preprocess it (remove color, background, down-sample etc.).
2. Use the Neural Network to compute a probability of taking correct action.
3. Sample from that probability distribution and tell the agent to move up, down or stay in same position.
4. If the round is over (you missed the ball or the opponent missed the ball), find whether you won or lost.
5. When the episode has finished(someone got to 21 points), pass the result through the training algorithm.
6. Repeat this process until our agent is tuned to the point where we can beat the computer. That’s basically it! Let’s start looking at how our code achieves this.

OK, now that we’ve described the problem and its solution, let’s get to writing some code (Details in YouTube tutorial)!

You can check full Video tutorial code and download trained models on GitHub or check following code:

# Tutorial by www.pylessons.com
# Tutorial written for - Tensorflow 1.15, Keras 2.2.4

import os
import random
import gym
import pylab
import numpy as np
from collections import deque
from keras.models import Model, load_model
from keras.layers import Input, Dense, Lambda, Add, Conv2D, Flatten
from keras.optimizers import Adam, RMSprop
from keras import backend as K
from PER import *
import cv2

def OurModel(input_shape, action_space, dueling):
X_input = Input(input_shape)
X = X_input

#X = Conv2D(64, 5, strides=(3, 3),padding="valid", input_shape=input_shape, activation="relu", data_format="channels_first")(X)
X = Conv2D(32, 8, strides=(4, 4),padding="valid", input_shape=input_shape, activation="relu", data_format="channels_first")(X)
X = Conv2D(64, 4, strides=(2, 2),padding="valid", activation="relu", data_format="channels_first")(X)
X = Conv2D(64, 3, strides=(1, 1),padding="valid", activation="relu", data_format="channels_first")(X)
X = Flatten()(X)

# 'Dense' is the basic form of a neural network layer
X = Dense(512, activation="relu", kernel_initializer='he_uniform')(X)

X = Dense(256, activation="relu", kernel_initializer='he_uniform')(X)

X = Dense(64, activation="relu", kernel_initializer='he_uniform')(X)

if dueling:
state_value = Dense(1, kernel_initializer='he_uniform')(X)
state_value = Lambda(lambda s: K.expand_dims(s[:, 0], -1), output_shape=(action_space,))(state_value)

action_advantage = Dense(action_space, kernel_initializer='he_uniform')(X)
action_advantage = Lambda(lambda a: a[:, :] - K.mean(a[:, :], keepdims=True), output_shape=(action_space,))(action_advantage)

X = Add()([state_value, action_advantage])
# Output Layer with # of actions: 2 nodes (left, right)
X = Dense(action_space, activation="linear", kernel_initializer='he_uniform')(X)

model = Model(inputs = X_input, outputs = X)
#model.compile(loss="mean_squared_error", optimizer=RMSprop(lr=0.00025, rho=0.95, epsilon=0.01), metrics=["accuracy"])
#model.compile(optimizer=Adam(lr=0.00025), loss='mean_squared_error')
model.compile(optimizer=Adam(lr=0.00005), loss='mean_squared_error')

return model

class DQNAgent:
def __init__(self, env_name):
self.env_name = env_name
self.env = gym.make(env_name)
self.action_size = self.env.action_space.n
self.EPISODES = 1000

# Instantiate memory
memory_size = 25000
self.MEMORY = Memory(memory_size)
self.memory = deque(maxlen=memory_size)

self.gamma = 0.99 # discount rate

# EXPLORATION HYPERPARAMETERS for epsilon and epsilon greedy strategy
self.epsilon = 1.0 # exploration probability at start
self.epsilon_min = 0.02 # minimum exploration probability
self.epsilon_decay = 0.00002 # exponential decay rate for exploration prob

self.batch_size = 32

# defining model parameters
self.ddqn = False # use doudle deep q network
self.dueling = False # use dealing netowrk
self.epsilon_greedy = False # use epsilon greedy strategy
self.USE_PER = False # use priority experienced replay

self.Save_Path = 'Models'
if not os.path.exists(self.Save_Path): os.makedirs(self.Save_Path)
self.scores, self.episodes, self.average = [], [], []

self.Model_name = os.path.join(self.Save_Path, self.env_name+"_CNN.h5")

self.ROWS = 80
self.COLS = 80
self.REM_STEP = 4
self.update_model_steps = 1000

self.state_size = (self.REM_STEP, self.ROWS, self.COLS)
self.image_memory = np.zeros(self.state_size)

# create main model and target model
self.model = OurModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)
self.target_model = OurModel(input_shape=self.state_size, action_space = self.action_size, dueling = self.dueling)

# after some time interval update the target model to be same with model
def update_target_model(self, game_steps):
if game_steps % self.update_model_steps == 0:

def remember(self, state, action, reward, next_state, done):
experience = state, action, reward, next_state, done
if self.USE_PER:

def act(self, state, decay_step):
if self.epsilon_greedy:
# Here we'll use an improved version of our epsilon greedy strategy for Q-learning
explore_probability = self.epsilon_min + (self.epsilon - self.epsilon_min) * np.exp(-self.epsilon_decay * decay_step)
if self.epsilon > self.epsilon_min:
self.epsilon *= (1-self.epsilon_decay)
explore_probability = self.epsilon

if explore_probability > np.random.rand():
# Make a random action (exploration)
return random.randrange(self.action_size), explore_probability
# Get action from Q-network (exploitation)
# Estimate the Qs values state
# Take the biggest Q value (= the best action)
return np.argmax(self.model.predict(state)), explore_probability

def replay(self):
if self.USE_PER:
# Sample minibatch from the PER memory
tree_idx, minibatch = self.MEMORY.sample(self.batch_size)
if len(self.memory) > self.batch_size:
# Randomly sample minibatch from the deque memory
minibatch = random.sample(self.memory, self.batch_size)

state = np.zeros((self.batch_size, *self.state_size), dtype=np.float32)
action = np.zeros(self.batch_size, dtype=np.int32)
reward = np.zeros(self.batch_size, dtype=np.float32)
next_state = np.zeros((self.batch_size, *self.state_size), dtype=np.float32)
done = np.zeros(self.batch_size, dtype=np.uint8)

# do this before prediction
# for speedup, this could be done on the tensor level
# but easier to understand using a loop
for i in range(len(minibatch)):
state[i], action[i], reward[i], next_state[i], done[i] = minibatch[i]

# do batch prediction to save speed
# predict Q-values for starting state using the main network
target = self.model.predict(state)
target_old = np.array(target)
# predict best action in ending state using the main network
target_next = self.model.predict(next_state)
# predict Q-values for ending state using the target network
target_val = self.target_model.predict(next_state)

for i in range(len(minibatch)):
# correction on the Q value for the action used
if done[i]:
target[i][action[i]] = reward[i]
# the key point of Double DQN
# selection of action is from model
# update is from target model
if self.ddqn: # Double - DQN
# current Q Network selects the action
# a'_max = argmax_a' Q(s', a')
a = np.argmax(target_next[i])
# target Q Network evaluates the action
# Q_max = Q_target(s', a'_max)
target[i][action[i]] = reward[i] + self.gamma * target_val[i][a]
else: # Standard - DQN
# DQN chooses the max Q value among next actions
# selection and evaluation of action is on the target Q Network
# Q_max = max_a' Q_target(s', a')
# when using target model in simple DQN rules, we get better performance
target[i][action[i]] = reward[i] + self.gamma * np.amax(target_val[i])

if self.USE_PER:
indices = np.arange(self.batch_size, dtype=np.int32)
absolute_errors = np.abs(target_old[indices, action]-target[indices, action])

# Update priority
self.MEMORY.batch_update(tree_idx, absolute_errors)

# Train the Neural Network with batches
self.model.fit(state, target, batch_size=self.batch_size, verbose=0)

def load(self, name):
self.model = load_model()

def save(self, name):

pylab.figure(figsize=(18, 9))
def PlotModel(self, score, episode):
self.average.append(sum(self.scores[-50:]) / len(self.scores[-50:]))
pylab.plot(self.episodes, self.average, 'r')
pylab.plot(self.episodes, self.scores, 'b')
pylab.ylabel('Score', fontsize=18)
pylab.xlabel('Games', fontsize=18)
dqn = 'DQN_'
dueling = ''
greedy = ''
PER = ''
if self.ddqn: dqn = '_DDQN'
if self.dueling: dueling = '_Dueling'
if self.epsilon_greedy: greedy = '_Greedy'
if self.USE_PER: PER = '_PER'
except OSError:
# no need to worry about model, when doing a lot of experiments
self.Model_name = os.path.join(self.Save_Path, self.env_name+dqn+dueling+greedy+PER+"_CNN.h5")

return self.average[-1]

def imshow(self, image, rem_step=0):
cv2.imshow("cartpole"+str(rem_step), image[rem_step,...])
if cv2.waitKey(25) & 0xFF == ord("q"):

def GetImage(self, frame):

# croping frame to 80x80 size
frame_cropped = frame[35:195:2, ::2,:]
if frame_cropped.shape[0] != self.COLS or frame_cropped.shape[1] != self.ROWS:
# OpenCV resize function
frame_cropped = cv2.resize(frame, (self.COLS, self.ROWS), interpolation=cv2.INTER_CUBIC)

# converting to RGB (numpy way)
frame_rgb = 0.299*frame_cropped[:,:,0] + 0.587*frame_cropped[:,:,1] + 0.114*frame_cropped[:,:,2]
# converting to RGB (OpenCV way)
#frame_rgb = cv2.cvtColor(frame_cropped, cv2.COLOR_RGB2GRAY)

# dividing by 255 we expresses value to 0-1 representation
new_frame = np.array(frame_rgb).astype(np.float32) / 255.0

# push our data by 1 frame, similar as deq() function work
self.image_memory = np.roll(self.image_memory, 1, axis = 0)

# inserting new frame to free space
self.image_memory[0,:,:] = new_frame

# show image frame

return np.expand_dims(self.image_memory, axis=0)

def reset(self):
frame = self.env.reset()
for i in range(self.REM_STEP):
state = self.GetImage(frame)
return state

def step(self,action):
next_state, reward, done, info = self.env.step(action)
next_state = self.GetImage(next_state)
return next_state, reward, done, info

def run(self):
decay_step = 0
max_average = -21.0
for e in range(self.EPISODES):
state = self.reset()
done = False
score = 0
while not done:
decay_step += 1
action, explore_probability = self.act(state, decay_step)
next_state, reward, done, _ = self.step(action)
self.remember(state, action, reward, next_state, done)
state = next_state
score += reward
if done:
# every episode, plot the result
average = self.PlotModel(score, e)

# saving best models
if average >= max_average:
max_average = average
print("episode: {}/{}, score: {}, e: {:.2f}, average: {:.2f} {}".format(e, self.EPISODES, score, explore_probability, average, SAVING))

# update target model

# train model

# close environemnt when finish training

def test(self, Model_name):
for e in range(self.EPISODES):
state = self.reset()
done = False
score = 0
while not done:
action = np.argmax(self.model.predict(state))
next_state, reward, done, _ = self.env.step(action)
score += reward
if done:
print("episode: {}/{}, score: {}".format(e, self.EPISODES, score))

if __name__ == "__main__":
env_name = 'Pong-v0'
agent = DQNAgent(env_name)


So I started training model from the simplest one, simple Deep Q network. Parameters I used to use:

self.ddqn = False # use doudle deep q network
self.dueling = False # use dealing netowrk
self.USE_PER = False # use priority experienced replay

I thought that this should be trained, and that it will perform the worst, maximum average score it could get was close to 5 scores. But Lets look at other models performance: