• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

深度强化学习(7) SAC 模型,附Pytorch完整代码

武飞扬头像
立Sir
帮助2

大家好,今天和各位分享一下 SAC (Soft Actor Critic) 算法,一种基于最大熵的无模型的深度强化学习算法。基于 OpenAI 的 gym 环境完成一个小案例,完整代码可以从我的 GitHub 中获得:

https://github.com/LiSir-HIT/Reinforcement-Learning/tree/main/Model


1. 基本原理

Deepmind 提出的 SAC (Soft Actor Critic) 算法是一种基于最大熵的无模型的深度强化学习算法,适合于真实世界的机器人学习技能。SAC 算法的效率非常高,它解决了离散动作空间和连续性动作空间的强化学习问题。SAC 算法在以最大化未来累积奖励的基础上引入了最大熵的概念,加入熵的目的是增强鲁棒性和智能体的探索能力SAC 算法的目的是使未来累积奖励值和熵最大化使得策略尽可能随机,即每个动作输出的概率尽可能的分散,而不是集中在一个动作上

SAC 算法的目标函数表达式如下: 

学新通

其中 T 表示智能体与环境互动的总时间步数,学新通表示在策略 学新通 下 学新通 的分布,学新通 代表熵值,学新通 代表超参数,它的目的是控制最优策略的随机程度和权衡熵相对于奖励的重要性。


2. 公式推导

SAC 是一种基于最大化熵理论的算法。由于目标函数中加入熵值,这使得该算法的探索能力和鲁棒性得到了很大的提升,尽可能的在奖励值和熵值(即策略的随机性)之间取得最大化平衡智能体因选择动作的随机性(更高的熵)而获得更高的奖励值,以使它不要过早收敛到某个次优确定性策略,即局部最优解。熵值越大,对环境的探索就越多,避免了策略收敛至局部最优,从而可以加快后续的学习速度

因此,最优策略的 SAC 公式定义为:

学新通

学新通

其中 学新通 用来更新已找到最大总奖励的策略学新通 是熵正则化系数,用来控制熵的重要程度; 学新通 代表熵值熵值越大,智能体对环境的探索度越大,使智能体能够找到一个更高效的策略,有助于加快后续的策略学习。

SAC 的 Q 值可以用基于熵值改进的贝尔曼方差来计算,价值函数定义如下:

学新通

其中,学新通 从经验回放池 D 中采样获得,状态价值函数定义如下:

学新通

它表示在某个状态下预期得到的奖励。此外,SAC 中的策略网络 学新通,软状态价值网络  学新通 ,目标状态价值网络网络 学新通,以及 2 个软 Q 网络 学新通,它们分别由 学新通 参数化。

因此 SAC 中包含 5 个神经网络:策略网络 学新通行为价值函数 学新通目标函数 学新通行为价值函数 学新通。为了分别找到最优策略,将随机梯度下降法应用于他们的目标函数中。 

学新通

此外,还采用了类似于双 Q 网络的形式,软 Q 值的最小值取两个由 学新通 和 学新通 参数化的 Q 值函数,这有助于避免过高估计不恰当的 Q 值,以提高训练速度。软 Q 值函数通过最小化贝尔曼误差来更新:

学新通

策略网络通过最小化 Kullback-Leibler(KL) 散度来更新:

学新通

算法流程如下:

学新通


3. 代码实现

这里以离散问题为例构建SAC,离线学习,代码如下:

  1.  
    # 处理离散问题的模型
  2.  
    import torch
  3.  
    from torch import nn
  4.  
    from torch.nn import functional as F
  5.  
    import numpy as np
  6.  
    import collections
  7.  
    import random
  8.  
     
  9.  
    # ----------------------------------------- #
  10.  
    # 经验回放池
  11.  
    # ----------------------------------------- #
  12.  
     
  13.  
    class ReplayBuffer:
  14.  
    def __init__(self, capacity): # 经验池容量
  15.  
    self.buffer = collections.deque(maxlen=capacity) # 队列,先进先出
  16.  
    # 经验池增加
  17.  
    def add(self, state, action, reward, next_state, done):
  18.  
    self.buffer.append((state, action, reward, next_state, done))
  19.  
    # 随机采样batch组
  20.  
    def sample(self, batch_size):
  21.  
    transitions = random.sample(self.buffer, batch_size)
  22.  
    # 取出这batch组数据
  23.  
    state, action, reward, next_state, done = zip(*transitions)
  24.  
    return np.array(state), action, reward, np.array(next_state), done
  25.  
    # 当前时刻的经验池容量
  26.  
    def size(self):
  27.  
    return len(self.buffer)
  28.  
     
  29.  
    # ----------------------------------------- #
  30.  
    # 策略网络
  31.  
    # ----------------------------------------- #
  32.  
     
  33.  
    class PolicyNet(nn.Module):
  34.  
    def __init__(self, n_states, n_hiddens, n_actions):
  35.  
    super(PolicyNet, self).__init__()
  36.  
    self.fc1 = nn.Linear(n_states, n_hiddens)
  37.  
    self.fc2 = nn.Linear(n_hiddens, n_actions)
  38.  
    # 前向传播
  39.  
    def forward(self, x): # 获取当前状态下的动作选择概率
  40.  
    x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
  41.  
    x = F.relu(x)
  42.  
    x = self.fc2(x) # [b,n_hiddens]-->[b,n_actions]
  43.  
    # 每个状态下对应的每个动作的动作概率
  44.  
    x = F.softmax(x, dim=1) # [b,n_actions]
  45.  
    return x
  46.  
     
  47.  
    # ----------------------------------------- #
  48.  
    # 价值网络
  49.  
    # ----------------------------------------- #
  50.  
     
  51.  
    class ValueNet(nn.Module):
  52.  
    def __init__(self, n_states, n_hiddens, n_actions):
  53.  
    super(ValueNet, self).__init__()
  54.  
    self.fc1 = nn.Linear(n_states, n_hiddens)
  55.  
    self.fc2 = nn.Linear(n_hiddens, n_actions)
  56.  
    # 当前时刻的state_value
  57.  
    def forward(self, x):
  58.  
    x = self.fc1(x) # [b,n_states]-->[b,n_hiddens]
  59.  
    x = F.relu(x)
  60.  
    x = self.fc2(x) # [b,n_hiddens]-->[b,n_actions]
  61.  
    return x
  62.  
     
  63.  
    # ----------------------------------------- #
  64.  
    # 模型构建
  65.  
    # ----------------------------------------- #
  66.  
     
  67.  
    class SAC:
  68.  
    def __init__(self, n_states, n_hiddens, n_actions,
  69.  
    actor_lr, critic_lr, alpha_lr,
  70.  
    target_entropy, tau, gamma, device):
  71.  
     
  72.  
    # 实例化策略网络
  73.  
    self.actor = PolicyNet(n_states, n_hiddens, n_actions).to(device)
  74.  
    # 实例化第一个价值网络--预测
  75.  
    self.critic_1 = ValueNet(n_states, n_hiddens, n_actions).to(device)
  76.  
    # 实例化第二个价值网络--预测
  77.  
    self.critic_2 = ValueNet(n_states, n_hiddens, n_actions).to(device)
  78.  
    # 实例化价值网络1--目标
  79.  
    self.target_critic_1 = ValueNet(n_states, n_hiddens, n_actions).to(device)
  80.  
    # 实例化价值网络2--目标
  81.  
    self.target_critic_2 = ValueNet(n_states, n_hiddens, n_actions).to(device)
  82.  
     
  83.  
    # 预测和目标的价值网络的参数初始化一样
  84.  
    self.target_critic_1.load_state_dict(self.critic_1.state_dict())
  85.  
    self.target_critic_2.load_state_dict(self.critic_2.state_dict())
  86.  
     
  87.  
    # 策略网络的优化器
  88.  
    self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
  89.  
    # 目标网络的优化器
  90.  
    self.critic_1_optimizer = torch.optim.Adam(self.critic_1.parameters(), lr=critic_lr)
  91.  
    self.critic_2_optimizer = torch.optim.Adam(self.critic_2.parameters(), lr=critic_lr)
  92.  
     
  93.  
    # 初始化可训练参数alpha
  94.  
    self.log_alpha = torch.tensor(np.log(0.01), dtype=torch.float)
  95.  
    # alpha可以训练求梯度
  96.  
    self.log_alpha.requires_grad = True
  97.  
    # 定义alpha的优化器
  98.  
    self.log_alpha_optimizer = torch.optim.Adam([self.log_alpha], lr=alpha_lr)
  99.  
     
  100.  
    # 属性分配
  101.  
    self.target_entropy = target_entropy
  102.  
    self.gamma = gamma
  103.  
    self.tau = tau
  104.  
    self.device = device
  105.  
     
  106.  
    # 动作选择
  107.  
    def take_action(self, state): # 输入当前状态 [n_states]
  108.  
    # 维度变换 numpy[n_states]-->tensor[1,n_states]
  109.  
    state = torch.tensor(state[np.newaxis,:], dtype=torch.float).to(self.device)
  110.  
    # 预测当前状态下每个动作的概率 [1,n_actions]
  111.  
    probs = self.actor(state)
  112.  
    # 构造与输出动作概率相同的概率分布
  113.  
    action_dist = torch.distributions.Categorical(probs)
  114.  
    # 从当前概率分布中随机采样tensor-->int
  115.  
    action = action_dist.sample().item()
  116.  
    return action
  117.  
     
  118.  
    # 计算目标,当前状态下的state_value
  119.  
    def calc_target(self, rewards, next_states, dones):
  120.  
    # 策略网络预测下一时刻的state_value [b,n_states]-->[b,n_actions]
  121.  
    next_probs = self.actor(next_states)
  122.  
    # 对每个动作的概率计算ln [b,n_actions]
  123.  
    next_log_probs = torch.log(next_probs 1e-8)
  124.  
    # 计算熵 [b,1]
  125.  
    entropy = -torch.sum(next_probs * next_log_probs, dim=1, keepdims=True)
  126.  
    # 目标价值网络,下一时刻的state_value [b,n_actions]
  127.  
    q1_value = self.target_critic_1(next_states)
  128.  
    q2_value = self.target_critic_2(next_states)
  129.  
    # 取出最小的q值 [b, 1]
  130.  
    min_qvalue = torch.sum(next_probs * torch.min(q1_value,q2_value), dim=1, keepdims=True)
  131.  
    # 下个时刻的state_value [b, 1]
  132.  
    next_value = min_qvalue self.log_alpha.exp() * entropy
  133.  
     
  134.  
    # 时序差分,目标网络输出当前时刻的state_value [b, n_actions]
  135.  
    td_target = rewards self.gamma * next_value * (1-dones)
  136.  
    return td_target
  137.  
     
  138.  
    # 软更新,每次训练更新部分参数
  139.  
    def soft_update(self, net, target_net):
  140.  
    # 遍历预测网络和目标网络的参数
  141.  
    for param_target, param in zip(target_net.parameters(), net.parameters()):
  142.  
    # 预测网络的参数赋给目标网络
  143.  
    param_target.data.copy_(param_target.data*(1-self.tau) param.data*self.tau)
  144.  
     
  145.  
    # 模型训练
  146.  
    def update(self, transition_dict):
  147.  
    # 提取数据集
  148.  
    states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) # [b,n_states]
  149.  
    actions = torch.tensor(transition_dict['actions']).view(-1,1).to(self.device) # [b,1]
  150.  
    rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
  151.  
    next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) # [b,n_states]
  152.  
    dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1,1).to(self.device) # [b,1]
  153.  
     
  154.  
    # --------------------------------- #
  155.  
    # 更新2个价值网络
  156.  
    # --------------------------------- #
  157.  
     
  158.  
    # 目标网络的state_value [b, 1]
  159.  
    td_target = self.calc_target(rewards, next_states, dones)
  160.  
    # 价值网络1--预测,当前状态下的动作价值 [b, 1]
  161.  
    critic_1_qvalues = self.critic_1(states).gather(1, actions)
  162.  
    # 均方差损失 预测-目标
  163.  
    critic_1_loss = torch.mean(F.mse_loss(critic_1_qvalues, td_target.detach()))
  164.  
    # 价值网络2--预测
  165.  
    critic_2_qvalues = self.critic_2(states).gather(1, actions)
  166.  
    # 均方差损失
  167.  
    critic_2_loss = torch.mean(F.mse_loss(critic_2_qvalues, td_target.detach()))
  168.  
     
  169.  
    # 梯度清0
  170.  
    self.critic_1_optimizer.zero_grad()
  171.  
    self.critic_2_optimizer.zero_grad()
  172.  
    # 梯度反传
  173.  
    critic_1_loss.backward()
  174.  
    critic_2_loss.backward()
  175.  
    # 梯度更新
  176.  
    self.critic_1_optimizer.step()
  177.  
    self.critic_2_optimizer.step()
  178.  
     
  179.  
    # --------------------------------- #
  180.  
    # 更新策略网络
  181.  
    # --------------------------------- #
  182.  
     
  183.  
    probs = self.actor(states) # 预测当前时刻的state_value [b,n_actions]
  184.  
    log_probs = torch.log(probs 1e-8) # 小于0 [b,n_actions]
  185.  
    # 计算策略网络的熵>0 [b,1]
  186.  
    entropy = -torch.sum(probs * log_probs, dim=1, keepdim=True)
  187.  
     
  188.  
    # 价值网络预测当前时刻的state_value
  189.  
    q1_value = self.critic_1(states) # [b,n_actions]
  190.  
    q2_value = self.critic_2(states)
  191.  
    # 取出价值网络输出的最小的state_value [b,1]
  192.  
    min_qvalue = torch.sum(probs * torch.min(q1_value, q2_value), dim=1, keepdim=True)
  193.  
     
  194.  
    # 策略网络的损失
  195.  
    actor_loss = torch.mean(-self.log_alpha.exp() * entropy - min_qvalue)
  196.  
    # 梯度更新
  197.  
    self.actor_optimizer.zero_grad()
  198.  
    actor_loss.backward()
  199.  
    self.actor_optimizer.step()
  200.  
     
  201.  
    # --------------------------------- #
  202.  
    # 更新可训练遍历alpha
  203.  
    # --------------------------------- #
  204.  
     
  205.  
    alpha_loss = torch.mean((entropy-self.target_entropy).detach() * self.log_alpha.exp())
  206.  
    # 梯度更新
  207.  
    self.log_alpha_optimizer.zero_grad()
  208.  
    alpha_loss.backward()
  209.  
    self.log_alpha_optimizer.step()
  210.  
     
  211.  
    # 软更新目标价值网络
  212.  
    self.soft_update(self.critic_1, self.target_critic_1)
  213.  
    self.soft_update(self.critic_2, self.target_critic_2)
学新通

4. 案例演示

基于 OpenAI 的 gym 环境完成一个推车游戏,一个离散的环境,目标是左右移动小车将黄色的杆子保持竖直。动作维度为2,属于离散值;状态维度为 4,分别是坐标、速度、角度、角速度。

学新通

学新通

训练部分的代码如下:

  1.  
    import gym
  2.  
    import torch
  3.  
    import numpy as np
  4.  
    import matplotlib.pyplot as plt
  5.  
    from RL_brain import ReplayBuffer, SAC
  6.  
     
  7.  
    # -------------------------------------- #
  8.  
    # 参数设置
  9.  
    # -------------------------------------- #
  10.  
     
  11.  
    num_epochs = 100 # 训练回合数
  12.  
    capacity = 500 # 经验池容量
  13.  
    min_size = 200 # 经验池训练容量
  14.  
    batch_size = 64
  15.  
    n_hiddens = 64
  16.  
    actor_lr = 1e-3 # 策略网络学习率
  17.  
    critic_lr = 1e-2 # 价值网络学习率
  18.  
    alpha_lr = 1e-2 # 课训练变量的学习率
  19.  
    target_entropy = -1
  20.  
    tau = 0.005 # 软更新参数
  21.  
    gamma = 0.9 # 折扣因子
  22.  
    device = torch.device('cuda') if torch.cuda.is_available() \
  23.  
    else torch.device('cpu')
  24.  
     
  25.  
    # -------------------------------------- #
  26.  
    # 环境加载
  27.  
    # -------------------------------------- #
  28.  
     
  29.  
    env_name = "CartPole-v1"
  30.  
    env = gym.make(env_name, render_mode="human")
  31.  
    n_states = env.observation_space.shape[0] # 状态数 4
  32.  
    n_actions = env.action_space.n # 动作数 2
  33.  
     
  34.  
    # -------------------------------------- #
  35.  
    # 模型构建
  36.  
    # -------------------------------------- #
  37.  
     
  38.  
    agent = SAC(n_states = n_states,
  39.  
    n_hiddens = n_hiddens,
  40.  
    n_actions = n_actions,
  41.  
    actor_lr = actor_lr,
  42.  
    critic_lr = critic_lr,
  43.  
    alpha_lr = alpha_lr,
  44.  
    target_entropy = target_entropy,
  45.  
    tau = tau,
  46.  
    gamma = gamma,
  47.  
    device = device,
  48.  
    )
  49.  
     
  50.  
    # -------------------------------------- #
  51.  
    # 经验回放池
  52.  
    # -------------------------------------- #
  53.  
     
  54.  
    buffer = ReplayBuffer(capacity=capacity)
  55.  
     
  56.  
    # -------------------------------------- #
  57.  
    # 模型构建
  58.  
    # -------------------------------------- #
  59.  
     
  60.  
    return_list = [] # 保存每回合的return
  61.  
     
  62.  
    for i in range(num_epochs):
  63.  
    state = env.reset()[0]
  64.  
    epochs_return = 0 # 累计每个时刻的reward
  65.  
    done = False # 回合结束标志
  66.  
     
  67.  
    while not done:
  68.  
    # 动作选择
  69.  
    action = agent.take_action(state)
  70.  
    # 环境更新
  71.  
    next_state, reward, done, _, _ = env.step(action)
  72.  
    # 将数据添加到经验池
  73.  
    buffer.add(state, action, reward, next_state, done)
  74.  
    # 状态更新
  75.  
    state = next_state
  76.  
    # 累计回合奖励
  77.  
    epochs_return = reward
  78.  
     
  79.  
    # 经验池超过要求容量,就开始训练
  80.  
    if buffer.size() > min_size:
  81.  
    s, a, r, ns, d = buffer.sample(batch_size) # 每次取出batch组数据
  82.  
    # 构造数据集
  83.  
    transition_dict = {'states': s,
  84.  
    'actions': a,
  85.  
    'rewards': r,
  86.  
    'next_states': ns,
  87.  
    'dones': d}
  88.  
    # 模型训练
  89.  
    agent.update(transition_dict)
  90.  
    # 保存每个回合return
  91.  
    return_list.append(epochs_return)
  92.  
     
  93.  
    # 打印回合信息
  94.  
    print(f'iter:{i}, return:{np.mean(return_list[-10:])}')
  95.  
     
  96.  
    # -------------------------------------- #
  97.  
    # 绘图
  98.  
    # -------------------------------------- #
  99.  
     
  100.  
    plt.plot(return_list)
  101.  
    plt.title('return')
  102.  
    plt.show()
学新通

运行100个回合,绘制每个回合的 return

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhefajgg
系列文章
更多 icon
同类精品
更多 icon
继续加载