6. 深度Q网络
6.1 表格法价值迭代的问题
表格法价值迭代的问题主要在于当场景的状态数过多,对于计算机的存储和计算都将是很大的挑战。尤其是遇到一些需要及时响应的场景,系统又显得十分的笨重和迟钝。
以Atari为例,游戏屏幕的分辨率为(210,160)像素,如果每个像素的颜色有128种。那么不同屏幕的状态有种。如果用表格法去迭代如此多状态的行动方案,既不科学,也很浪费资源。因为这其中99.9%的状态可能都是不会遇到的。
这时候就有了另一条思路:把经常碰到的场景解决好,没必要迭代所有的状态。
具体的算法跟前面提到的价值迭代差不多:
-
从的空表开始迭代 -
从环境中获取。 -
Bellman更新:。 -
检查收敛条件,回到step 2。
以Frozenlake为例:
import gym
import collections
from tensorboardX import SummaryWriter
ENV_NAME = "FrozenLake-v0"
GAMMA = 0.9
ALPHA = 0.2
TEST_EPISODES = 20
class Agent:
def __init__(self):
self.env = gym.make(ENV_NAME) # 初始化环境
self.state = self.env.reset() # 初始化状态
self.values = collections.defaultdict(float) # 初始化Q表
def sample_env(self): # 随机策略尝试
action = self.env.action_space.sample()
old_state = self.state
new_state, reward, is_done, _ = self.env.step(action)
self.state = self.env.reset() if is_done else new_state
return (old_state, action, reward, new_state)
def best_value_and_action(self, state): # greedy策略
best_value, best_action = None, None
for action in range(self.env.action_space.n):
action_value = self.values[(state, action)]
if best_value is None or best_value < action_value:
best_value = action_value
best_action = action
return best_value, best_action
def value_update(self, s, a, r, next_s): # bellman更新Q表
best_v, _ = self.best_value_and_action(next_s)
new_val = r + GAMMA * best_v
old_val = self.values[(s, a)]
self.values[(s, a)] = old_val * (1-ALPHA) + new_val * ALPHA
def play_episode(self, env): # 运用当前的Q表greedy策略测试play效果
total_reward = 0.0
state = env.reset()
while True:
_, action = self.best_value_and_action(state)
new_state, reward, is_done, _ = env.step(action)
total_reward += reward
if is_done:
break
state = new_state
return total_reward
训练过程代码:
if __name__ == "__main__":
test_env = gym.make(ENV_NAME)
agent = Agent()
writer = SummaryWriter(comment="-q-learning")
iter_no = 0
best_reward = 0.0
while True:
iter_no += 1
s, a, r, next_s = agent.sample_env()
agent.value_update(s, a, r, next_s) # 随机策略更新Q表
reward = 0.0
# 测试阶段不做Q表更新
for _ in range(TEST_EPISODES): # 每次测试20个episodes,计算平均reward
reward += agent.play_episode(test_env)
reward /= TEST_EPISODES
writer.add_scalar("reward", reward, iter_no)
if reward > best_reward:
print("Best reward updated %.3f -> %.3f" % (best_reward, reward))
best_reward = reward
if reward > 0.80:
print("Solved in %d iterations!" % iter_no)
break
writer.close()
以我的电脑为例,结果为:

从这个训练结果可以看出。这里由于在测试阶段并没有更新Q表,Q表更新完全依赖于随机策略,所以中间测试阶段平均回报存在不小的波动。训练过程并非逐渐改善。
6.2 深度Q学习
从前文的Q表学习发现,方法依然是在迭代所有状态的结果。在状态数较少的情况下,适用性还是具备的。当我们把Q学习应用在更加复杂的场景里,很容易发现多种状态的学习其实是没有什么太大的意义的。以Atari游戏为例,每帧画面作为一个状态的话,很多相似的画面,其实并没有太大的区别。分别迭代牺牲算力的同时,对于解决问题的帮助并不大。
此外,有时候将Q学习方法用在游戏里,每帧的画面可能并不意味着一个确定的状态。连续的帧提供的信息可能更准确。
考虑到Q表方法对于状态更多的场景处理的困难,深度Q学习则指望利用网络结构来表达“状态”、“行为”到Q值的非线性映射表达,类似于一个回归问题。
基本的算法框架如下:
-
初始化一个非线性映射关系。 -
与环境互动,获得训练数据。 -
计算损失:若游戏结束,;若游戏未结束,。 -
运用SGD方法更新,最小化。 -
循环至2,直至收敛。
上面只是最最原始和基本的深度Q学习算法框架。显然直接这样做是不行的。流程上还有几点会遇到的细节困难需要说明:
-
随机策略在训练初期更重要,贪婪策略在训练的中后期更加重要。于是有了epsilon-greedy算法,且epsilon逐渐衰减。 -
近似状态行为到价值Q的非线性关系,用SGD方法。然而SGD要求训练数据满足独立同分布条件。显然强化学习的训练数据不满足这个条件。每个batch的数据来源于同一个episode,前后存在路径相关性。再者,我们的训练数据不是从我们期望寻找的optimal policy中得到的。训练数据是从某个随机策略或者其他某种策略中得到,同分布的条件也不满足。解决这个问题的办法就是使用replay buffer。使得训练数据中新旧搭配。 -
Bellman方程中使用自助(bootstrapping)来迭代,而估算时会用到。然而和仅一步之遥,两个状态可能极其相似。当我们在改变参数使得变得更好的同时,存在让变得更糟糕的可能性。并且Q网络也变得更加不稳定。解决这个问题的办法就是:每步训练调整,但步之后才将的参数同步到。而每步都用来计算。 -
RL方法的基础是MDP马尔科夫决策过程。然而当把RL方法应用于游戏场景中时,显然马尔科夫性被破坏。Partially Observable MDPs(POMDP)。Atari中的解决方式是用多个帧作为一个状态。
基于上述几点,深度Q学习算法框架通常一般的流程如下:
-
随机参数初始化两个非线性Q网络和。,且创建一个空的replay buffer。 -
以概率选择随机行动,否则。 -
执行行动,并观察回报和下一个状态。 -
存储到replay buffer中。 -
从replay buffer中随机抽样构造minibatch。 -
buffer中的状态转移关系,如果为episode的总结状态,目标;如果并非episode的终结状态,则目标。 -
计算损失。 -
应用SGD方法最小化损失,更新非线性Q网络。 -
每N步将同步到。 -
回到第2步,直至收敛。
6.3 以Pong游戏为例的DQN
6.3.1 构造计算Q值的网络结构
这里很简单,就是用一个网络结构来映射不同的状态到不同行为可能得到的Q值。网络结构如下:
import torch
import torch.nn as nn
import numpy as np
class DQN(nn.Module):
def __init__(self, input_shape, n_actions):
super(DQN, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU()
)
conv_out_size = self._get_conv_out(input_shape)
self.fc = nn.Sequential(
nn.Linear(conv_out_size, 512),
nn.ReLU(),
nn.Linear(512, n_actions)
)
def _get_conv_out(self, shape): # 这个函数用来返回通过卷积层并flatten后的维度
o = self.conv(torch.zeros(1, *shape))
# 用batch=1,输入数据形状通过卷积网络测试卷积完成后的尺寸,然后计算flatten之后的向量的长度
return int(np.prod(o.size())) # 运用np.prod把各个维度上的数值相乘
def forward(self, x):
conv_out = self.conv(x).view(x.size()[0], -1)
# 保留batch数,输出每个batch训练数据中不同行为a对应的Q值
# x.size()[0] 就是batch数
return self.fc(conv_out)
从上述代码可知,该Q网络由三层卷积构成,每层卷积中间用ReLU()激活。最终线性层输出的结果正好维度与行动数n_actions相等,得到不同行动可以获得的Q值。
这里由于输入信息为游戏画面的图像,网络结构为几层卷积。对于卷积网络结构多说几句。input_shape为图像的Size,比如例子中图像的Size就是(3, 210, 160)。画面就是3通道,画面尺寸为210*160。
这个例子里要注意的是,我们在用下面语句new一个DQN网络对象时,
net=DQN(input_shape, n_actions)
初始化网络结构输入的参数input_shape就是图像的Size,不能将batch数也给进去。否则在:
nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4)
就出错了。input_shape这里是(3, 210, 160),不能写成(50, 3, 210, 160)(比如batch=50)。
通常自己在检测代码的时候可以对网络结构做些简单测试。比如像下面这样:
x = torch.randn(12, 1, 210, 160) # 随机生成一个batch的数据
input_shape = x[0].shape # 用batch中的一个训练样本得到input_shape
net = DQN(input_shape, n_actions=8) # 初始化一个网络
print(net(x).shape) # 测试一个batch数据经过网络前向计算后的结果
比如按照上面的方法测试网络结构就可以发现最后net(x)输出的结果的形状就是:
torch.Size([12, 8])
batch中12个样本(状态)对应的8种行为分别对应的Q值。说明网络结构没问题,那么非线性表达就完成了。
6.3.2 训练过程
#!/usr/bin/env python3
from lib import wrappers
from lib import dqn_model
import argparse
import time
import numpy as np
import collections
import torch
import torch.nn as nn
import torch.optim as optim
from tensorboardX import SummaryWriter
# 一些初始化参数的设定
DEFAULT_ENV_NAME = "PongNoFrameskip-v4"
MEAN_REWARD_BOUND = 19.5
GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_SIZE = 10000
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 1000
REPLAY_START_SIZE = 10000
# epsilon参数衰减设定
EPSILON_DECAY_LAST_FRAME = 10**5
EPSILON_START = 1.0
EPSILON_FINAL = 0.02
Experience = collections.namedtuple('Experience', field_names=['state', 'action', 'reward', 'done', 'new_state'])
# 用namedtuple表示经验数据的结构,训练数据由此生成
class ExperienceBuffer:
def __init__(self, capacity):
# 由于ExperienceBuffer容量固定,所以deque定义buffer类型
self.buffer = collections.deque(maxlen=capacity)
def __len__(self): # 返回buffer长度
return len(self.buffer)
def append(self, experience): # buffer中增加数据
self.buffer.append(experience)
def sample(self, batch_size): # 从buffer中抽样获得一个batch的数据
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
# 按照随机选取的位置抽取数据,用zip(*)解压分别返回
# 因为buffer为Experience组成的deque
# 抽样结果把states、actions等用zip分别拆开,再返回ndarray的形式
states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])
return np.array(states), np.array(actions), np.array(rewards, dtype=np.float32),
np.array(dones, dtype=np.uint8), np.array(next_states)
class Agent:
def __init__(self, env, exp_buffer):
self.env = env # 初始化给定环境
self.exp_buffer = exp_buffer # 初始化给定训练数据池exp_buffer
self._reset() # 重置环境至最初状态
def _reset(self):
self.state = env.reset() # 环境初始化
self.total_reward = 0.0 # reward重置为0.0
def play_step(self, net, epsilon=0.0, device="cpu"):
done_reward = None
if np.random.random() < epsilon:
action = env.action_space.sample() # 以epsilon的概率随机选择action
else:
state_a = np.array([self.state], copy=False)
state_v = torch.tensor(state_a).to(device)
q_vals_v = net(state_v) # 用网络估算当前状态的Q值向量
_, act_v = torch.max(q_vals_v, dim=1) # 最大化Q值选择行动
action = int(act_v.item())
# do step in the environment
new_state, reward, is_done, _ = self.env.step(action)
# 获取新的状态、当前回报,并判定是否到终点
self.total_reward += reward # 累积回报
# 打包新的经验数据
exp = Experience(self.state, action, reward, is_done, new_state)
# 把新经验放入exp_buffer
self.exp_buffer.append(exp)
self.state = new_state
if is_done:
done_reward = self.total_reward
self._reset()
return done_reward
# 计算DQN的Q网络的损失
def calc_loss(batch, net, tgt_net, device="cpu"):
states, actions, rewards, dones, next_states = batch
states_v = torch.tensor(states).to(device)
next_states_v = torch.tensor(next_states).to(device)
actions_v = torch.tensor(actions).to(device)
rewards_v = torch.tensor(rewards).to(device)
done_mask = torch.ByteTensor(dones).to(device)
# 用网络net计算当前状态行为的值
state_action_values = net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
# 用目标网络tgt_net计算下一个状态(最大化Q值对应行为)的值
next_state_values = tgt_net(next_states_v).max(1)[0]
# 游戏终结,episode结束,下一个状态的值为0.0
next_state_values[done_mask] = 0.0
next_state_values = next_state_values.detach()
# 期望的状态行为值,为未来的状态行为值乘GAMMA折现后加上兑现的reward
expected_state_action_values = next_state_values * GAMMA + rewards_v
# 比较期望状态行为值和当前状态行为值的差值并返回
return nn.MSELoss()(state_action_values, expected_state_action_values)
if __name__ == "__main__":
# 前面这一堆主要用来命令行跑程序时喂给程序的参数设定。不熟悉的去了解下CLI。
parser = argparse.ArgumentParser()
parser.add_argument("--cuda", default=False, action="store_true", help="Enable cuda")
parser.add_argument("--env", default=DEFAULT_ENV_NAME,
help="Name of the environment, default=" + DEFAULT_ENV_NAME)
parser.add_argument("--reward", type=float, default=MEAN_REWARD_BOUND,
help="Mean reward boundary for stop of training, default=%.2f" % MEAN_REWARD_BOUND)
args = parser.parse_args()
# 判定是否使用GPU训练
device = torch.device("cuda" if args.cuda else "cpu")
# 设定环境,这里涉及到游戏环境的封装,不准备讲wrappers了。
env = wrappers.make_env(args.env)
# 初始化两个同结构的Q网络
net = dqn_model.DQN(env.observation_space.shape, env.action_space.n).to(device)
tgt_net = dqn_model.DQN(env.observation_space.shape, env.action_space.n).to(device)
writer = SummaryWriter(comment="-" + args.env)
print(net)
buffer = ExperienceBuffer(REPLAY_SIZE)
agent = Agent(env, buffer)
epsilon = EPSILON_START
optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
total_rewards = []
frame_idx = 0
ts_frame = 0
ts = time.time()
best_mean_reward = None
while True:
frame_idx += 1
# epsilon的衰减
epsilon = max(EPSILON_FINAL, EPSILON_START - frame_idx / EPSILON_DECAY_LAST_FRAME)
reward = agent.play_step(net, epsilon, device=device)
if reward is not None:
total_rewards.append(reward)
speed = (frame_idx - ts_frame) / (time.time() - ts)
ts_frame = frame_idx
ts = time.time()
mean_reward = np.mean(total_rewards[-100:])
print("%d: done %d games, mean reward %.3f, eps %.2f, speed %.2f f/s" % (
frame_idx, len(total_rewards), mean_reward, epsilon,
speed
))
writer.add_scalar("epsilon", epsilon, frame_idx)
writer.add_scalar("speed", speed, frame_idx)
writer.add_scalar("reward_100", mean_reward, frame_idx)
writer.add_scalar("reward", reward, frame_idx)
if best_mean_reward is None or best_mean_reward < mean_reward:
torch.save(net.state_dict(), args.env + "-best.dat")
if best_mean_reward is not None:
print("Best mean reward updated %.3f -> %.3f, model saved" % (best_mean_reward, mean_reward))
best_mean_reward = mean_reward
if mean_reward > args.reward:
print("Solved in %d frames!" % frame_idx)
break
if len(buffer) < REPLAY_START_SIZE:
continue
# 一段时间同步一次网络结构参数
if frame_idx % SYNC_TARGET_FRAMES == 0:
tgt_net.load_state_dict(net.state_dict())
optimizer.zero_grad()
batch = buffer.sample(BATCH_SIZE)
loss_t = calc_loss(batch, net, tgt_net, device=device)
loss_t.backward()
optimizer.step()
writer.close()
6.3.3 小结
本来还有一个wrapper模块。但考虑到现实中其他场景的应用并不一定跟这里的游戏环境一样。所以经常有擅长调包的朋友问我做Reinforcement Learning的时候用的什么包。我觉得很不好回答,只好说去看看gym吧。
其实吧,gym也是给大家提供了一些常用实验环境的框架,有兴趣的可以去看看源码。但我觉得并不是那么必要。真实要理解的还是RL的方法和思路。很多应用场景需要我们自己去搭建env的。
而且RL是一个非常开放的体系,大家都是在设定机器人,很多设定不一样得到的结果天差地别。我们选取的超参数决定了机器的“性格”,行动的约束等等都大概率锁定了机器最佳行动策略policy的选择。
这篇大家理解两个东西就好了。
-
状态行为价值这个映射关系,当状态复杂到一定程度,不能用表格法来表达的时候,我们可以用一个网络结构来表达这种映射关系。由原来的迭代表格中的值,变为迭代网络结构中参数。 -
DQN的训练过程、算法、net和tgt_net、replay_buffer等等。
另有一个需要注意的地方:上面例子的场景是游戏场景,每个episode有game over的时候。而现实中我们可能会面临no-terminal的场景。单步reward的计算,total_reward的评估跟上面的流程有稍许差异。
原文始发于微信公众号(拒绝拍脑袋):【授人以渔】RL系列(4)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/54875.html