Learning From Data——DQN

之前的博客讲了reinforcement learning,但是上节课讲得更多的像是理论层面的东西,实际操作起来还是一脸懵逼。这次介绍一个非常有名的DQN(Deep Q-network),是神经网络和Q-learning结合起来的一个算法。并且在最后,我们会用它做一个有趣的事情。 ## Q-learning ##

之前我们其实提到了一下Q-learning,最重要的就是动作价值函数Q。它接受两个参数:state,以及action。在Q-learning中,我们会维护一个Q-table。然后根据Q-table来决定下一步的动作怎么选择。

之前的文章中,比较复杂的地方是在一个状态选择一个动作之后,下一个状态是什么还是不确定的,现在我们可以简化这个值,也就是每个状态做一个动作之后得到的下一个状态一定是确定的。这样,问题就会得到简化。 \[ Q(s,a) = R(s,a)+\gamma \cdot \max_{\tilde{a} }\{Q(\tilde{s},\tilde{a})\} \]

当然,即使是不确定的,学习的道理也是一样的。只不过更复杂了,我们需要去计算期望值。

我们可以看一个简单的例子来理解Q-learning。假如现在有这样的一个房间:

模型化之后长这个样子:

这时候,我们可以得到一个R-table,他表示的是每个状态取每个动作之后的奖励是多少,这是我们不可控制的部分。

同时我们需要维护的就是Q-table。我们不知道Q-table到底该长什么样子。因此最开始全部初始化为0。

现在我们来尝试更新这个Q-table,Q-learning的学习过程如下:

随机选择一个状态s,假如我们在状态1,查找R表,发现可以走的是3和5.如果采取状态5,那么根据算法的过程: \[ \begin{aligned} Q(1,5) &=R(1,5)+\max\{Q(5,1),Q(5,4),Q(5,5)\} &=100+ \max{0,0,0}\\ &= 100 \end{aligned} \]

好了,更新Q-table中,\(Q(1,5)\)为100。

同样的道理,\(Q(1,3)\)更新为0。

好了这样一个episode就结束了,我们继续这个过程,最后可以得到这个Q-table长成了这个样子:

通过查找最大的价值,我们来决定下一步怎么走。

当然,上面的例子太简单了,仔细思考的话,我们会发现一些问题。这个Q表是慢慢更新的。有时候,一个Q表的下面几个动作的Q值,并不是最终的结果。但是我们按照最大值,就会永远选择那个最大的,可能实际上它并不是最大的。这可能造成Q-table永远得不到更新。实际上,Q-learning的步骤比上面的更复杂一些,如下:

Initialize Q-table abitrarily

Repeat (for each episode):

Initialize \(s\)

Repeat (for each step of episode):

  • Choose \(a\) from \(s\) using policy derived from Q(e.g. \(\epsilon\)-greedy)
  • Take action \(a\), observe \(r\), \(s'\)
  • \(Q(s,a):= Q(s,a)+\alpha[r+\gamma \max Q(s',a') - Q(s,a)]\)
  • s := s'

until s is terminal

这里,\(\alpha\)为学习率,\(\gamma\)为折扣因子。这些一般都是经验值。Q表可以看作是agent的记忆,如果\(\alpha\)更大,我们更依赖于当前表的值,否则我们更倾向于更新的值。而\(\gamma\)则是我们是否有长远眼光的一个度量。

Q-learning还是挺强大的,但是之前我们提到了维度诅咒,如果这个维度过大,维护这个表的负担将是不可想象的。因此就有了很多别的方法来计算这个Q值,之前提到了有线性的,特征映射的线性,以及使用神经网络。当然神经网络的效果是好于其他两个的,这就是Deep Q-Network。

Sarsa

在介绍Deep Q-Network之前,我们再提一个简单的强化学习算法,叫Sarsa,它和Q-learning算法非常相似。

Initialize Q-table abitrarily

Repeat (for each episode):

Initialize \(s\)

Choose \(a\) from \(s\) using policy derived from Q(e.g. \(\\epsilon\)-greedy)

Repeat (for each step of episode):

  • Take action \(a\), observe \(r\), \(s’\)
  • Choose \(a’\) from \(s’\) using policy derived from Q(e.g. \(\\epsilon\)-greedy)
  • \(Q(s,a):= Q(s,a)+\\alpha\[r+\\gamma Q(s’,a’) - Q(s,a)\]\)
  • \(s := s’,a := a’\)

until \(s\) is terminal

它和Q-learning的区别在于,Q-learning选择了最大的\(Q(s’,a’)\),更新Q表以后,下一步并不一定会做\(a’\)的动作,而sarsa多了一个选择\(a’\)的步骤,它选的不一定是最大值,并且在下一步一定执行这个动作。

可以看到的sarsa是在线学习,它的探索一定要自己去做,而Q-learning是离线学习,它可以使用别人的经验。而实际上,sarsa也有一些别的扩展算法,如sarsa(\(\\lambda\))等,在这里就不细谈了。

Deep Q-Network

接下来就到了Deep Q-Network的内容了。之前提到了,Q-learning的问题在于,如果高维度连续的情况下,维护一个Q-table是不现实的。一个比较好的做法是把Q表的更新问题变成一个函数拟合的问题。比如输入状态\(s\),动作\(a\),再加上一个额外的参数\(\\theta\),用来得到\(Q’(s,a)\)

Q(s,a;\theta) \approx Q'(s,a)

而神经网络又可以自动提取复杂特征,所以用它来做这个事情是最合适不过了。

但是首先遇到的一个问题,神经网络需要标签,也就是\(y\)值,这个\(y\)值怎么得到呢?

上一篇博客提到了,使用物理法则,或者是模拟器等等,来创造这样的样本,产生经验元组,以供神经网络来训练。我们假设得到的目标样本为\(Q\_{\\text{target} }\),则神经网络的Loss-function为:

L(\theta) = \mathbb{E}[Q_{\text{target} } - Q(s,a,\theta)^2]

根据Q-learning得到:

Q_{\text{target} } = r + \gamma\max_{a'}Q(s',a',\theta)

此外,在神经网络与Q-learning的结合中,还诞生了一些新的概念,如经验池(experience replay),以及目标网络。

experience replay

经验池的功能主要是解决相关性及非静态分布问题。具体做法是把每个时间步agent与环境交互得到的转移样本\((s\_t,a\_t,r\_t,s\_{t+1})\)储存到回放记忆单元,要训练时就随机拿出一些(minibatch)来训练。

在我看来经验池就是存储之前得到的样本。毕竟现在reward也不是之前可以用一张表就能描述的了。

TargetNet

后来提出来的改进中,有专门一个网络,用来生成目标Q值。也许你会想说,你怎么知道这个得到的目标就一定是正确的呢?实际上,即使在原来的Q-learning中,Q表也是慢慢更新的。每一次迭代并不一定得到就是正确的值,你用来更新使用的是之前的非正确值,但是多次episode之后,这个值就趋于了稳定,收敛到正确的值,这里也是一样的。是一个互相督促的过程,先用目标网络产生目标值,让神经网络去不断逼近这个目标值,然后用再用神经网络替换目标网络,生成样本,不断这么重复。为什么这样可以?我也不是很清楚这背后的数学关系。

下面是Deep Q-Network的算法:

你可以查看这篇paper来了解关于DQN的更多信息:
Human-level control through deep reinforcement learning

最后,我们来实现一下上一篇强化学习博客中提到的cartpole。

cartpole算是python库gym中的一个小项目,除了这个,它还有很多别的小游戏可以去尝试。

我们要做的是让小车尽量保持平衡。而状态是个四维向量\((x,\\theta,\\dot x \\dot \\theta)\),动作只有两种,向左或者向右。

整个想法和Q-learning是非常相似的。我们怎么选择下一个动作?这个依然要用到\(\\epsilon\)-greedy,以\(\\epsilon\)的概率输入状态,以得到各个动作的Q值(action-value funtion),然后选取最大的Q值对应的动作作为下一个输入;另外就是随机探索了。

如果小杆子到了,说明游戏结束,这时候得到的直接就是reward,而不会有下一个状态,如果没有倒,则状态转到下一个。当然,这些经历(state,action,next_state,reward,done)都要放到经验池里,然后我们根据经验池的数据来训练神经网络。

什么时候训练网络是一个经验值。在这里我们假设time%10=0的时候来训练,也就是进行了10的整数倍次数动作之后。什么时候更换目标网络也是一个经验值。

然后我们要面临的是学习的问题。我们通过上面的式子得到y,\(\\theta\)表示的是MainNet,而\(\\hat \\theta\)表示是TargetNet,通过他们来得到y值,并用MSE loss以及Adam优化器来完成对参数的更新。需要注意的是机器学习中好的参数非常重要,如果出现错误了首先想到算法是不是写错了,接下来就要考虑调参的问题。

在这个实验中,我们对\(\\epsilon\)的值也会进行控制,想法是越往后就让它有越大的可能进行贪心选择。

其实这个算法的实现并不算难,但是需要掌握一定的pytorch相关的知识,在这方面我还是比较薄弱的。实际上这个实验是数据学习课程的一个作业。而学长已经编好了大部分的框架,只需要我们实现网络的建立,网络学习过程以及动作的选择。因此大大简化了任务。下面是我的实现。采用4×80×80×2的网络,激活函数为ReLU。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# -*- coding: utf-8 -*-
"""
Created on Thu Nov 29 13:05:14 2018

@author: xtw+wlsdzyzl
"""

import random
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
torch.set_default_dtype(torch.float64)
EPISODES = 1000
'''
some api you may use:
torch.from_numpy()
torch.view()
torch.max()

'''


class net(nn.Module): # build your net
def __init__(self, state_size, action_size):
super(net, self).__init__()
'''
your code
'''
self.fc1 = nn.Linear(state_size,60)
self.fc1.weight.data.normal_(0.0,0.1)
self.fc2 = nn.Linear(60,60)
self.fc2.weight.data.normal_(0.0,0.1)
self.out = nn.Linear(60,action_size)
self.out.weight.data.normal_(0.0,0.1)

def forward(self, x):
'''
your code
'''
x = self.fc1(x)
x = nn.functional.relu(x)
x = self.fc2(x)
x = nn.functional.relu(x)
return self.out(x)





class DQNAgent: # bulid DQNagent
def __init__(self, state_size, action_size, q_model, t_model):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.gamma = 0.95 # discount rate
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.001
self.epsilon_decay = 0.995
self.q_model = q_model # model
self.t_model = t_model
self.criterion = nn.MSELoss() # define loss
self.optimiser = optim.Adam(self.q_model.parameters(),lr = 0.001) #define optimiser



def remember(self, state, action, reward, next_state, done): # save memory
self.memory.append((state, action, reward, next_state, done))

def act(self, state):
'''
your code
'''
if random.random() < self.epsilon:
if random.random()<0.5:
return 0
else:
return 1
else:
# argmax Q
action_value = self.q_model.forward(torch.from_numpy(state))

return int(torch.argmax(action_value))
# returns action

def replay(self, batch_size):
minibatch = random.sample(self.memory, batch_size)
'''
your codes,
use data from memory to train you q_model
'''
for state,action,reward,next_state,done in minibatch:
if done:
yj = reward
else:
yj = reward + self.gamma*torch.max(self.t_model.forward(torch.from_numpy(next_state)))
loss = self.criterion(yj,self.q_model.forward(torch.from_numpy(state))[action])
self.optimiser.zero_grad()
loss.backward()
self.optimiser.step()

if self.epsilon > self.epsilon_min: # epsilon decay after each training
self.epsilon *= self.epsilon_decay


def update_t(self): # update t_model weights
torch.save(self.q_model.state_dict(), 'params.pkl')
self.t_model.load_state_dict(torch.load('params.pkl'))


if __name__ == "__main__":
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
q_model = net(state_size, action_size) # generate nets and DQNagent model
t_model = net(state_size, action_size)
agent = DQNAgent(state_size, action_size, q_model, t_model)
done = False
replace_target_iter = 25
batch_size = 100

for e in range(EPISODES):
state = env.reset()

if e % replace_target_iter == 0: # update t_model weights
agent.update_t()
for time in range(481):
env.render() # show the amination
action = agent.act(state) # chose action
next_state, reward, done, _ = env.step(action) # Interact with Environment
reward = reward if not done else -10 # get -10 reward if fail

agent.remember(state, action, reward, next_state, done) # save memory
state = next_state
if done:
print("episode: {}/{}, score: {}, e: {:.2}"
.format(e, EPISODES, time, agent.epsilon))
break
if len(agent.memory) > batch_size and time % 10 == 0: # train q_model
agent.replay(batch_size)