【授人以渔】RL系列(4)

6. 深度Q网络

6.1 表格法价值迭代的问题

表格法价值迭代的问题主要在于当场景的状态数过多,对于计算机的存储和计算都将是很大的挑战。尤其是遇到一些需要及时响应的场景,系统又显得十分的笨重和迟钝。

以Atari为例,游戏屏幕的分辨率为(210,160)像素,如果每个像素的颜色有128种。那么不同屏幕的状态有种。如果用表格法去迭代如此多状态的行动方案,既不科学,也很浪费资源。因为这其中99.9%的状态可能都是不会遇到的。

这时候就有了另一条思路:把经常碰到的场景解决好,没必要迭代所有的状态

具体的算法跟前面提到的价值迭代差不多:

  1. 的空表开始迭代
  2. 从环境中获取
  3. Bellman更新:
  4. 检查收敛条件,回到step 2。

以Frozenlake为例:

import gym
import collections
from tensorboardX import SummaryWriter

ENV_NAME = "FrozenLake-v0"
GAMMA = 0.9
ALPHA = 0.2
TEST_EPISODES = 20


class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)  # 初始化环境
        self.state = self.env.reset()  # 初始化状态
        self.values = collections.defaultdict(float)  # 初始化Q表

    def sample_env(self):    # 随机策略尝试
        action = self.env.action_space.sample()
        old_state = self.state
        new_state, reward, is_done, _ = self.env.step(action)
        self.state = self.env.reset() if is_done else new_state
        return (old_state, action, reward, new_state)

    def best_value_and_action(self, state):  # greedy策略
        best_value, best_action = NoneNone
        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)]
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_value, best_action

    def value_update(self, s, a, r, next_s):  # bellman更新Q表
        best_v, _ = self.best_value_and_action(next_s)
        new_val = r + GAMMA * best_v
        old_val = self.values[(s, a)]
        self.values[(s, a)] = old_val * (1-ALPHA) + new_val * ALPHA

    def play_episode(self, env):  # 运用当前的Q表greedy策略测试play效果
        total_reward = 0.0
        state = env.reset()
        while True:
            _, action = self.best_value_and_action(state)
            new_state, reward, is_done, _ = env.step(action)
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward

训练过程代码:

if __name__ == "__main__":
    test_env = gym.make(ENV_NAME)
    agent = Agent()
    writer = SummaryWriter(comment="-q-learning")

    iter_no = 0
    best_reward = 0.0
    while True:
        iter_no += 1
        s, a, r, next_s = agent.sample_env()
        agent.value_update(s, a, r, next_s)  # 随机策略更新Q表

        reward = 0.0
        
        # 测试阶段不做Q表更新
        for _ in range(TEST_EPISODES):   # 每次测试20个episodes,计算平均reward
            reward += agent.play_episode(test_env)
        reward /= TEST_EPISODES
        writer.add_scalar("reward", reward, iter_no)
        if reward > best_reward:
            print("Best reward updated %.3f -> %.3f" % (best_reward, reward))
            best_reward = reward
        if reward > 0.80:
            print("Solved in %d iterations!" % iter_no)
            break
    writer.close()

以我的电脑为例,结果为:

【授人以渔】RL系列(4)
训练结果

从这个训练结果可以看出。这里由于在测试阶段并没有更新Q表,Q表更新完全依赖于随机策略,所以中间测试阶段平均回报存在不小的波动。训练过程并非逐渐改善。

6.2 深度Q学习

从前文的Q表学习发现,方法依然是在迭代所有状态的结果。在状态数较少的情况下,适用性还是具备的。当我们把Q学习应用在更加复杂的场景里,很容易发现多种状态的学习其实是没有什么太大的意义的。以Atari游戏为例,每帧画面作为一个状态的话,很多相似的画面,其实并没有太大的区别。分别迭代牺牲算力的同时,对于解决问题的帮助并不大。

此外,有时候将Q学习方法用在游戏里,每帧的画面可能并不意味着一个确定的状态。连续的帧提供的信息可能更准确。

考虑到Q表方法对于状态更多的场景处理的困难,深度Q学习则指望利用网络结构来表达“状态”、“行为”到Q值的非线性映射表达,类似于一个回归问题。

基本的算法框架如下

  1. 初始化一个非线性映射关系
  2. 与环境互动,获得训练数据
  3. 计算损失:若游戏结束,;若游戏未结束,
  4. 运用SGD方法更新,最小化
  5. 循环至2,直至收敛。

上面只是最最原始和基本的深度Q学习算法框架。显然直接这样做是不行的。流程上还有几点会遇到的细节困难需要说明:

  1. 随机策略在训练初期更重要,贪婪策略在训练的中后期更加重要。于是有了epsilon-greedy算法,且epsilon逐渐衰减。
  2. 近似状态行为到价值Q的非线性关系,用SGD方法。然而SGD要求训练数据满足独立同分布条件。显然强化学习的训练数据不满足这个条件。每个batch的数据来源于同一个episode,前后存在路径相关性。再者,我们的训练数据不是从我们期望寻找的optimal policy中得到的。训练数据是从某个随机策略或者其他某种策略中得到,同分布的条件也不满足。解决这个问题的办法就是使用replay buffer。使得训练数据中新旧搭配。
  3. Bellman方程中使用自助(bootstrapping)来迭代,而估算时会用到。然而仅一步之遥,两个状态可能极其相似。当我们在改变参数使得变得更好的同时,存在让变得更糟糕的可能性。并且Q网络也变得更加不稳定。解决这个问题的办法就是:每步训练调整,但步之后才将的参数同步到。而每步都用来计算
  4. RL方法的基础是MDP马尔科夫决策过程。然而当把RL方法应用于游戏场景中时,显然马尔科夫性被破坏。Partially Observable MDPs(POMDP)。Atari中的解决方式是用多个帧作为一个状态。

基于上述几点,深度Q学习算法框架通常一般的流程如下

  1. 随机参数初始化两个非线性Q网络,且创建一个空的replay buffer。
  2. 以概率选择随机行动,否则
  3. 执行行动,并观察回报和下一个状态
  4. 存储到replay buffer中。
  5. 从replay buffer中随机抽样构造minibatch。
  6. buffer中的状态转移关系,如果为episode的总结状态,目标;如果并非episode的终结状态,则目标
  7. 计算损失
  8. 应用SGD方法最小化损失,更新非线性Q网络
  9. 每N步将同步到
  10. 回到第2步,直至收敛。

6.3 以Pong游戏为例的DQN

6.3.1 构造计算Q值的网络结构

这里很简单,就是用一个网络结构来映射不同的状态到不同行为可能得到的Q值。网络结构如下:

import torch
import torch.nn as nn
import numpy as np


class DQN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQN, self).__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(3264, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(6464, kernel_size=3, stride=1),
            nn.ReLU()
        )

        conv_out_size = self._get_conv_out(input_shape)
        self.fc = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )

    def _get_conv_out(self, shape):  # 这个函数用来返回通过卷积层并flatten后的维度
        o = self.conv(torch.zeros(1, *shape))  
        # 用batch=1,输入数据形状通过卷积网络测试卷积完成后的尺寸,然后计算flatten之后的向量的长度
        return int(np.prod(o.size())) # 运用np.prod把各个维度上的数值相乘

    def forward(self, x):
        conv_out = self.conv(x).view(x.size()[0], -1)
        # 保留batch数,输出每个batch训练数据中不同行为a对应的Q值
        # x.size()[0] 就是batch数
        return self.fc(conv_out)

从上述代码可知,该Q网络由三层卷积构成,每层卷积中间用ReLU()激活。最终线性层输出的结果正好维度与行动数n_actions相等,得到不同行动可以获得的Q值。

这里由于输入信息为游戏画面的图像,网络结构为几层卷积。对于卷积网络结构多说几句。input_shape为图像的Size,比如例子中图像的Size就是(3, 210, 160)。画面就是3通道,画面尺寸为210*160。

这个例子里要注意的是,我们在用下面语句new一个DQN网络对象时,

net=DQN(input_shape, n_actions)

初始化网络结构输入的参数input_shape就是图像的Size,不能将batch数也给进去。否则在:

nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4)

就出错了。input_shape这里是(3, 210, 160),不能写成(50, 3, 210, 160)(比如batch=50)。

通常自己在检测代码的时候可以对网络结构做些简单测试。比如像下面这样:

x = torch.randn(121210160)   # 随机生成一个batch的数据
input_shape = x[0].shape           # 用batch中的一个训练样本得到input_shape
net = DQN(input_shape, n_actions=8)  # 初始化一个网络
print(net(x).shape)          # 测试一个batch数据经过网络前向计算后的结果

比如按照上面的方法测试网络结构就可以发现最后net(x)输出的结果的形状就是:

torch.Size([12, 8])

batch中12个样本(状态)对应的8种行为分别对应的Q值。说明网络结构没问题,那么非线性表达就完成了。

6.3.2 训练过程

#!/usr/bin/env python3
from lib import wrappers
from lib import dqn_model

import argparse
import time
import numpy as np
import collections

import torch
import torch.nn as nn
import torch.optim as optim

from tensorboardX import SummaryWriter

# 一些初始化参数的设定
DEFAULT_ENV_NAME = "PongNoFrameskip-v4"
MEAN_REWARD_BOUND = 19.5     

GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_SIZE = 10000           
LEARNING_RATE = 1e-4
SYNC_TARGET_FRAMES = 1000
REPLAY_START_SIZE = 10000

# epsilon参数衰减设定
EPSILON_DECAY_LAST_FRAME = 10**5
EPSILON_START = 1.0
EPSILON_FINAL = 0.02


Experience = collections.namedtuple('Experience', field_names=['state''action''reward''done''new_state'])  
# 用namedtuple表示经验数据的结构,训练数据由此生成


class ExperienceBuffer:
    def __init__(self, capacity):
        # 由于ExperienceBuffer容量固定,所以deque定义buffer类型
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):   # 返回buffer长度
        return len(self.buffer)

    def append(self, experience):  # buffer中增加数据
        self.buffer.append(experience)

    def sample(self, batch_size):  # 从buffer中抽样获得一个batch的数据
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        
        # 按照随机选取的位置抽取数据,用zip(*)解压分别返回
        # 因为buffer为Experience组成的deque
        # 抽样结果把states、actions等用zip分别拆开,再返回ndarray的形式
        states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])  
        return np.array(states), np.array(actions), np.array(rewards, dtype=np.float32), 
               np.array(dones, dtype=np.uint8), np.array(next_states)


class Agent:
    def __init__(self, env, exp_buffer):
        self.env = env    # 初始化给定环境
        self.exp_buffer = exp_buffer   # 初始化给定训练数据池exp_buffer
        self._reset()   # 重置环境至最初状态

    def _reset(self):
        self.state = env.reset()  # 环境初始化
        self.total_reward = 0.0   # reward重置为0.0

    def play_step(self, net, epsilon=0.0, device="cpu"):
        done_reward = None

        if np.random.random() < epsilon:
            action = env.action_space.sample()   # 以epsilon的概率随机选择action
        else:
            state_a = np.array([self.state], copy=False)
            state_v = torch.tensor(state_a).to(device)  
            q_vals_v = net(state_v)   # 用网络估算当前状态的Q值向量
            _, act_v = torch.max(q_vals_v, dim=1# 最大化Q值选择行动
            action = int(act_v.item())

        # do step in the environment
        new_state, reward, is_done, _ = self.env.step(action) 
        # 获取新的状态、当前回报,并判定是否到终点
        self.total_reward += reward  # 累积回报
        
        # 打包新的经验数据
        exp = Experience(self.state, action, reward, is_done, new_state)
        # 把新经验放入exp_buffer
        self.exp_buffer.append(exp)
        self.state = new_state
        if is_done:
            done_reward = self.total_reward
            self._reset()
        return done_reward


# 计算DQN的Q网络的损失
def calc_loss(batch, net, tgt_net, device="cpu"):
    states, actions, rewards, dones, next_states = batch

    states_v = torch.tensor(states).to(device)
    next_states_v = torch.tensor(next_states).to(device)
    actions_v = torch.tensor(actions).to(device)
    rewards_v = torch.tensor(rewards).to(device)
    done_mask = torch.ByteTensor(dones).to(device)
    
    # 用网络net计算当前状态行为的值
    state_action_values = net(states_v).gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
    # 用目标网络tgt_net计算下一个状态(最大化Q值对应行为)的值
    next_state_values = tgt_net(next_states_v).max(1)[0]
    # 游戏终结,episode结束,下一个状态的值为0.0
    next_state_values[done_mask] = 0.0
    next_state_values = next_state_values.detach()
    
    # 期望的状态行为值,为未来的状态行为值乘GAMMA折现后加上兑现的reward
    expected_state_action_values = next_state_values * GAMMA + rewards_v
    # 比较期望状态行为值和当前状态行为值的差值并返回
    return nn.MSELoss()(state_action_values, expected_state_action_values)


if __name__ == "__main__":
    # 前面这一堆主要用来命令行跑程序时喂给程序的参数设定。不熟悉的去了解下CLI。
    parser = argparse.ArgumentParser()
    parser.add_argument("--cuda", default=False, action="store_true", help="Enable cuda")
    parser.add_argument("--env", default=DEFAULT_ENV_NAME,
                        help="Name of the environment, default=" + DEFAULT_ENV_NAME)
    parser.add_argument("--reward", type=float, default=MEAN_REWARD_BOUND,
                        help="Mean reward boundary for stop of training, default=%.2f" % MEAN_REWARD_BOUND)
    args = parser.parse_args()
    
    # 判定是否使用GPU训练
    device = torch.device("cuda" if args.cuda else "cpu")
    
    # 设定环境,这里涉及到游戏环境的封装,不准备讲wrappers了。
    env = wrappers.make_env(args.env)
    
    # 初始化两个同结构的Q网络
    net = dqn_model.DQN(env.observation_space.shape, env.action_space.n).to(device)
    tgt_net = dqn_model.DQN(env.observation_space.shape, env.action_space.n).to(device)
    writer = SummaryWriter(comment="-" + args.env)
    print(net)

    buffer = ExperienceBuffer(REPLAY_SIZE)
    agent = Agent(env, buffer)
    epsilon = EPSILON_START

    optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
    total_rewards = []
    frame_idx = 0
    ts_frame = 0
    ts = time.time()
    best_mean_reward = None

    while True:
        frame_idx += 1
        # epsilon的衰减
        epsilon = max(EPSILON_FINAL, EPSILON_START - frame_idx / EPSILON_DECAY_LAST_FRAME)

        reward = agent.play_step(net, epsilon, device=device)
        if reward is not None:
            total_rewards.append(reward)
            speed = (frame_idx - ts_frame) / (time.time() - ts)
            ts_frame = frame_idx
            ts = time.time()
            mean_reward = np.mean(total_rewards[-100:])
            print("%d: done %d games, mean reward %.3f, eps %.2f, speed %.2f f/s" % (
                frame_idx, len(total_rewards), mean_reward, epsilon,
                speed
            ))
            writer.add_scalar("epsilon", epsilon, frame_idx)
            writer.add_scalar("speed", speed, frame_idx)
            writer.add_scalar("reward_100", mean_reward, frame_idx)
            writer.add_scalar("reward", reward, frame_idx)
            if best_mean_reward is None or best_mean_reward < mean_reward:
                torch.save(net.state_dict(), args.env + "-best.dat")
                if best_mean_reward is not None:
                    print("Best mean reward updated %.3f -> %.3f, model saved" % (best_mean_reward, mean_reward))
                best_mean_reward = mean_reward
            if mean_reward > args.reward:
                print("Solved in %d frames!" % frame_idx)
                break

        if len(buffer) < REPLAY_START_SIZE:
            continue
        
        # 一段时间同步一次网络结构参数
        if frame_idx % SYNC_TARGET_FRAMES == 0:
            tgt_net.load_state_dict(net.state_dict())

        optimizer.zero_grad()
        batch = buffer.sample(BATCH_SIZE)
        loss_t = calc_loss(batch, net, tgt_net, device=device)
        loss_t.backward()
        optimizer.step()
    writer.close()

6.3.3 小结

本来还有一个wrapper模块。但考虑到现实中其他场景的应用并不一定跟这里的游戏环境一样。所以经常有擅长调包的朋友问我做Reinforcement Learning的时候用的什么包。我觉得很不好回答,只好说去看看gym吧。

其实吧,gym也是给大家提供了一些常用实验环境的框架,有兴趣的可以去看看源码。但我觉得并不是那么必要。真实要理解的还是RL的方法和思路。很多应用场景需要我们自己去搭建env的。

而且RL是一个非常开放的体系,大家都是在设定机器人,很多设定不一样得到的结果天差地别。我们选取的超参数决定了机器的“性格”,行动的约束等等都大概率锁定了机器最佳行动策略policy的选择。

这篇大家理解两个东西就好了。

  1. 状态行为价值这个映射关系,当状态复杂到一定程度,不能用表格法来表达的时候,我们可以用一个网络结构来表达这种映射关系。由原来的迭代表格中的值,变为迭代网络结构中参数。
  2. DQN的训练过程、算法、net和tgt_net、replay_buffer等等。

另有一个需要注意的地方:上面例子的场景是游戏场景,每个episode有game over的时候。而现实中我们可能会面临no-terminal的场景。单步reward的计算,total_reward的评估跟上面的流程有稍许差异


原文始发于微信公众号(拒绝拍脑袋):【授人以渔】RL系列(4)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/54875.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!