Skip to content

三种经典方法

复制本地路径 | 在线编辑

我们的目标,让价值最大,回顾其贝尔曼方程:

\[ V_{\pi}(s)=\sum_{a \in A} \pi(a \mid s)\left(R(s, a)+\gamma \sum_{s^{\prime} \in S} p\left(s^{\prime} \mid s, a\right) V_{\pi}\left(s^{\prime}\right)\right) \]

上面的式子看起来复杂,其实很简单,就是当前状态的价值等于当前状态的奖励加上未来状态价值的折扣总和。其实就是从一个简单方程推来的,在第一章的时候推导过。

要理解好上面的式子,需要先理解好 \(\pi(a \mid s)\)\(p(s' \mid s,a)\) 的区别,如果不行则前面章节没有掌握好。

这一章节看代码更好理解。

动态规划

上一章已经介绍过了,动态规划。其实他就是迭代更新,如下所示:

\[ V_{\bold{k+1}}(s)=\sum_{a \in A} \pi(a \mid s)\left(R(s, a)+\gamma \sum_{s^{\prime} \in S} p\left(s^{\prime} \mid s, a\right) V_{\bold{k}}\left(s^{\prime}\right)\right) \]

其中 \(V_{k}\) 是第 k 次迭代时状态 s 的价值函数估计,所以只是把 \(V\) 的下标给改了。这么说也可以,但意义其实发生了很大的变化。

这样不断迭代,直到最终收敛。有严格证明这样是能收敛的,但是太过学术和数学化,这里不展开讨论,可以去下方链接的最后查看:

看代码:

动态规划
import numpy as np
import gym
from gym import spaces

class MazeEnv(gym.Env):
    """
    自定义迷宫环境,继承自 gym.Env
    """
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(MazeEnv, self).__init__()
        # 定义动作空间和状态空间
        self.action_space = spaces.Discrete(4)
        self.maze_size = (5, 5)
        self.observation_space = spaces.Box(low=0, high=4, shape=(2,), dtype=np.int32)

        # 定义迷宫(0 表示空地,-1 表示墙壁)
        self.maze = np.zeros(self.maze_size)
        self.maze[0, 3] = -1  # 墙壁位置
        self.maze[1, 1] = -1
        self.maze[1, 3] = -1
        self.maze[2, 1] = -1
        self.maze[3, 3] = -1
        self.maze[4, 1] = -1

        # 起点和终点
        self.start_pos = (0, 0)
        self.goal_pos = (0, 4)

        # 智能体初始位置
        self.agent_pos = self.start_pos

    def step(self, action):
        directions = {
            0: (-1, 0),  # 上
            1: (1, 0),   # 下
            2: (0, -1),  # 左
            3: (0, 1)    # 右
        }

        move = directions[action]
        new_pos = (self.agent_pos[0] + move[0], self.agent_pos[1] + move[1])

        # 检查新位置是否在迷宫范围内
        if (0 <= new_pos[0] < self.maze_size[0]) and (0 <= new_pos[1] < self.maze_size[1]):
            # 检查新位置是否是墙壁
            if self.maze[new_pos] != -1:
                self.agent_pos = new_pos  # 更新位置
                reward = -1
                done = False
            else:
                # 撞到墙壁
                reward = -5
                done = False
        else:
            # 越界
            reward = -5
            done = False

        # 检查是否到达终点
        if self.agent_pos == self.goal_pos:
            reward = 10
            done = True

        obs = np.array(self.agent_pos)
        info = {}
        return obs, reward, done, info

    def reset(self):
        self.agent_pos = self.start_pos
        return np.array(self.agent_pos)

    def render(self, mode='human'):
        maze_render = np.copy(self.maze)
        maze_render[self.agent_pos] = 2  # 智能体
        maze_render[self.start_pos] = 3  # 起点
        maze_render[self.goal_pos] = 4   # 终点
        symbol_map = {
            -1: 'W',  # 墙壁
            0: ' ',
            2: 'A',   # 智能体
            3: 'S',   # 起点
            4: 'G'    # 终点
        }
        print("\n".join(["".join([symbol_map[item] for item in row]) for row in maze_render]))
        print()

def policy_iteration(env, gamma=0.9, theta=1e-5, max_iter=1000):
    """
    使用策略迭代求解迷宫。
    gamma: 折扣因子(小于1,避免负循环)
    theta: 收敛阈值
    max_iter: 策略迭代的最大迭代次数,防止死循环
    """

    # 1) 收集所有非墙壁状态
    states = []
    for r in range(env.maze_size[0]):
        for c in range(env.maze_size[1]):
            if env.maze[r, c] != -1: 
                states.append((r, c))

    actions = [0, 1, 2, 3]  # 上、下、左、右

    def step_in_model(state, action):
        # 终点不需要再动
        if state == env.goal_pos:
            return state, 0.0, True

        directions = {
            0: (-1, 0),
            1: (1, 0),
            2: (0, -1),
            3: (0, 1)
        }
        move = directions[action]
        new_state = (state[0] + move[0], state[1] + move[1])

        if not (0 <= new_state[0] < env.maze_size[0] and 0 <= new_state[1] < env.maze_size[1]):
            # 越界
            return state, -5, False
        if env.maze[new_state] == -1:
            # 撞墙
            return state, -5, False

        # 正常移动
        reward = -1
        done = False
        if new_state == env.goal_pos:
            reward = 10
            done = True
        return new_state, reward, done

    # 2) 初始化策略、价值函数
    pi = {}
    V = {}
    for s in states:
        if s == env.goal_pos:
            pi[s] = None
            V[s] = 0.0
        else:
            pi[s] = np.random.choice(actions)
            V[s] = 0.0

    # 3) 策略迭代
    iter_count = 0
    while True:
        iter_count += 1
        if iter_count > max_iter:
            print("超过最大迭代次数,提前退出,可能未完全收敛。")
            break

        # ========== (A) 策略评估 ==========
        while True:
            delta = 0
            for s in states:
                if s == env.goal_pos:
                    continue
                v_old = V[s]
                a = pi[s]
                s_next, r, done = step_in_model(s, a)
                if done:
                    V[s] = r
                else:
                    V[s] = r + gamma * V[s_next]
                delta = max(delta, abs(V[s] - v_old))
            if delta < theta:
                break

        # ========== (B) 策略改进 ==========
        policy_stable = True
        for s in states:
            if s == env.goal_pos:
                continue
            old_a = pi[s]

            best_a = None
            best_q = float('-inf')
            for a in actions:
                s_next, r, done = step_in_model(s, a)
                q_sa = r if done else (r + gamma * V[s_next])
                if q_sa > best_q:
                    best_q = q_sa
                    best_a = a

            pi[s] = best_a
            if best_a != old_a:
                policy_stable = False

        if policy_stable:
            print(f"策略在迭代 {iter_count} 次后稳定。")
            break

    return pi, V

if __name__ == "__main__":
    env = MazeEnv()
    policy, value = policy_iteration(env, gamma=0.9, theta=1e-5, max_iter=200)

    # 查看价值函数与策略
    print("最终价值函数(部分):")
    for s in sorted(value.keys()):
        print(f"State {s}: V = {value[s]:.2f}")

    print("\n最终策略:")
    action_dict = {0: '↑', 1: '↓', 2: '←', 3: '→'}
    for s in sorted(policy.keys()):
        if policy[s] is None:
            print(f"{s} -> 终点")
        else:
            print(f"{s} -> {action_dict[policy[s]]}")

    # 用最终策略跑一遍环境
    obs = env.reset()
    env.render()
    done = False
    step_count = 0
    while not done and step_count < 50:
        s = tuple(obs)
        action = policy[s]
        obs, reward, done, _ = env.step(action)
        env.render()
        step_count += 1

    print("Episode finished!")

重要说明

这一段话很重要

下面的话很重要,一定要先看这段话才能继续看其他两种方法。

看起来动态规划就很好了。但从上面公式看出,我们需要掌握研究问题的许多本质,包括状态转移概率和奖励函数。在实际问题中,环境模型通常是未知的或者计算量过大。

所以这是后面两种方法的意义:我们能否在盲盒的情况下,找到最佳策略?这里还是把开头所写的贝尔曼方程放过来,此时 \(\pi\)\(p\) 都不太好知道:

\[ V_{\pi}(s)=\sum_{a \in A} \pi(a \mid s)\left(R(s, a)+\gamma \sum_{s^{\prime} \in S} p\left(s^{\prime} \mid s, a\right) V_{\pi}\left(s^{\prime}\right)\right) \]

蒙特卡洛方法(MC)

很纯粹的思想,蒙特卡洛,也就是大量试验,然后计算每一条序列的总奖励(即回报),最后取平均值,来近似估计价值函数。每次得到一个轨迹,计算状态 s 的回报 \(G_t\),最终:

\[ V_\pi(s) = \frac{1}{N} \sum_{i=1}^{N} G_t^{i} \]

增量更新

实际中,通常使用增量更新,看下面方程就理解了:

\[ V_\pi(s) \leftarrow V_\pi(s) + \eta (G_t - V_\pi(s)) \]

其中 \(\eta\) 是学习率,控制了我们对新信息的信任程度。如果很大(比如接近 1),那么价值函数的更新会比较激进,每次都大幅度地向着新的回报值靠拢。反之,如果很小,那么价值函数的更新会比较保守,每次都只稍微调整一点。而 \(G_t - V_\pi(s)\) 是误差项。这个公式代表了一个朴素的直觉:一次采样的回报如果大于当前价值函数的估计值,那么很可能说明,我们的估计值偏低了,需要变高一点;反则反之。

实际学习 Q

实际情况其实我们计算的是 \(Q(s, a)\),在进行策略采样时,我们从状态 s 出发时,必须强制执行动作 a,然后算出 \(G_t\)

\[ Q_\pi(s, a) \leftarrow Q_\pi(s, a) + \eta (G_t - Q_\pi(s, a)) \]

这一段有点抽象,建议看代码理解。

使用 ε-greedy 方法

有了 Q,我们就可以用「贪婪化」的方式进行策略改进了:

\[ a^* = \argmax_{a} Q(s, a) \]

但是,仅仅使用「贪婪化」的方式存在收敛性的问题。我们通过模拟轨迹来估计价值函数,如果仅仅是按照当前已知的最优策略(即「利用」)进行采样,那么智能体可能会一直困在它已经知道的状态空间中,无法发现新的、可能更好的状态和动作。

为了保证算法的收敛性,我们常使用 ε-greedy 策略在探索和利用之间做一个平衡。这意味着在大多数情况下(概率为 1- \(\epsilon\)),我们会选择当前最优的动作,但在少数情况下(概率为 \(\epsilon\)),我们会随机选择一个动作,从而保证探索的充分性。

这一段有点抽象,建议看代码理解。

代码

蒙特卡洛方法
import gym
from gym import spaces
import numpy as np
import random

class MazeEnv(gym.Env):
    """
    自定义迷宫环境,继承自 gym.Env
    """
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(MazeEnv, self).__init__()
        # 定义动作空间和状态空间
        # 动作空间:上、下、左、右
        self.action_space = spaces.Discrete(4)
        # 状态空间:智能体在迷宫中的位置(二维坐标)
        self.maze_size = (5, 5)
        self.observation_space = spaces.Box(low=0, high=4, shape=(2,), dtype=np.int32)

        # 定义迷宫(0 表示空地,-1 表示墙壁)
        self.maze = np.zeros(self.maze_size)
        self.maze[0, 3] = -1  # 墙壁位置
        self.maze[1, 1] = -1
        self.maze[1, 3] = -1
        self.maze[2, 1] = -1
        self.maze[3, 3] = -1
        self.maze[4, 1] = -1

        # 起点和终点
        self.start_pos = (0, 0)
        self.goal_pos = (0, 4)

        # 智能体初始位置
        self.agent_pos = self.start_pos

    def step(self, action):
        """
        执行动作
        """
        # 定义动作对应的移动
        directions = {
            0: (-1, 0),  # 上
            1: (1, 0),   # 下
            2: (0, -1),  # 左
            3: (0, 1)    # 右
        }
        # 根据动作计算新的位置
        move = directions[action]
        new_pos = (self.agent_pos[0] + move[0], self.agent_pos[1] + move[1])

        # 默认的奖励和终止标志
        reward = -1
        done = False

        # 检查新位置是否在迷宫范围内
        if (0 <= new_pos[0] < self.maze_size[0]) and (0 <= new_pos[1] < self.maze_size[1]):
            # 检查新位置是否是墙壁
            if self.maze[new_pos] == -1:
                # 撞到墙壁
                reward = -5
            else:
                # 合法移动
                self.agent_pos = new_pos
        else:
            # 超出迷宫范围
            reward = -5

        # 检查是否到达终点
        if self.agent_pos == self.goal_pos:
            reward = 10
            done = True

        obs = np.array(self.agent_pos)
        info = {}
        return obs, reward, done, info

    def reset(self):
        """
        重置环境到初始状态
        """
        self.agent_pos = self.start_pos
        return np.array(self.agent_pos)

    def render(self, mode='human'):
        """
        渲染迷宫环境
        """
        maze_render = np.copy(self.maze)
        maze_render[self.agent_pos] = 2  # 智能体的位置
        maze_render[self.start_pos] = 3  # 起点
        maze_render[self.goal_pos] = 4   # 终点
        symbol_map = {
            -1: 'W',  # 墙壁
            0: ' ',   # 空地
            2: 'A',   # 智能体
            3: 'S',   # 起点
            4: 'G'    # 终点
        }
        print("\n".join(["".join([symbol_map[item] for item in row]) for row in maze_render]))
        print("\n")

def mc_control_on_policy(env, num_episodes=5000, gamma=1.0, epsilon=0.1):
    """
    基于第一访问蒙特卡洛的 on-policy 控制(ε-贪心)。
    :param env: 自定义迷宫环境
    :param num_episodes: 训练的回合数
    :param gamma: 折扣因子
    :param epsilon: 探索率
    :return: Q, 最优的状态-动作价值函数
    """
    # Q 表示状态-动作价值函数,大小为 [行, 列, 动作数]
    Q = np.zeros((env.maze_size[0], env.maze_size[1], env.action_space.n))

    # 这里使用一个字典来存储每个状态-动作对的回报(列表),方便后续取平均做更新
    returns = dict()
    for r in range(env.maze_size[0]):
        for c in range(env.maze_size[1]):
            for a in range(env.action_space.n):
                returns[((r, c), a)] = []

    def epsilon_greedy_policy(state):
        """
        给定当前的 Q 和 explored state, 采用 ε-贪心策略选择动作
        """
        r, c = state
        if random.random() < epsilon:
            # 随机探索
            return np.random.choice(env.action_space.n)
        else:
            # 贪心选择
            return np.argmax(Q[r, c])

    for episode in range(num_episodes):
        # 生成一条回合(episode)
        state = env.reset()
        episode_trace = []  # 存储 (state, action, reward) 元组

        done = False
        while not done:
            action = epsilon_greedy_policy(tuple(state))
            next_state, reward, done, _ = env.step(action)
            episode_trace.append((tuple(state), action, reward))
            state = next_state

        # 回溯回合,更新 Q
        visited_state_actions = set()
        G = 0  # 从后往前计算折扣回报
        # 在这里从后向前计算更简洁(若想从前向后可先沿 episode_trace 再次扫一遍计算回报)
        for t in reversed(range(len(episode_trace))):
            s_t, a_t, r_t = episode_trace[t]
            G = gamma * G + r_t
            # 检查是否是该回合中首次出现的 (s_t, a_t)
            if (s_t, a_t) not in visited_state_actions:
                visited_state_actions.add((s_t, a_t))
                returns[(s_t, a_t)].append(G)
                # 增量方式更新 Q(s, a)
                Q[s_t[0], s_t[1], a_t] = np.mean(returns[(s_t, a_t)])
    return Q

if __name__ == "__main__":
    # 创建环境
    env = MazeEnv()

    # 使用蒙特卡洛方法进行训练
    Q = mc_control_on_policy(env, num_episodes=3000, gamma=1.0, epsilon=0.1)

    # 打印最终学到的 Q
    print("训练结束后学到的状态-动作价值函数 Q:")
    for r in range(env.maze_size[0]):
        for c in range(env.maze_size[1]):
            print(f"State=({r},{c}) -> Q={Q[r, c]}")
        print()

    # 根据学到的 Q 构造出一个贪心策略并测试
    def greedy_policy(state):
        return np.argmax(Q[state[0], state[1]])

    # 测试智能体在环境中的表现
    state = env.reset()
    env.render()
    done = False
    step_count = 0
    while not done and step_count < 50:  # 做一个简单的步数限制,防止卡死
        action = greedy_policy(tuple(state))
        next_state, reward, done, _ = env.step(action)
        state = next_state
        env.render()
        step_count += 1

    if tuple(state) == env.goal_pos:
        print("智能体成功到达目标!")
    else:
        print("智能体未能到达目标。")

时间差分方法(TD)

蒙特卡洛缺点:通过「采样」来进行「策略评估」,进而进行策略控制,得到我们想要的策略。但这样意味着: 需要等到一个完整的回合(episode)结束,才能计算出每个状态的真实收益(return),然后用这个收益去更新价值函数。学习效率比较低。对于持续性的任务(continuous task),或者步骤很长的回合制任务(episodic task),这种方法就不适合了。

回忆一下蒙特卡洛方法的原理,它的更新公式是:

\[ Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \eta (G_t - Q(s_t, a_t)) \]

这个公式中的 \(G_t\) 是采样的时候,整个回合结束之后计算得到的。而时间差分方法中,使用:

\[ Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \eta (r_{t} + \gamma Q(s_{t+1}, a_{t+1}) - Q(s_t, a_t)) \]

其中 \(r_{t}\) 就是当前操作的奖励。用一个估计值来更新另一个估计值么,左脚踩右脚,这种方法就是自举(bootstrapping),之前动态规划也讲过。时间差分方法实际是把蒙特卡洛方法的采样和动态规划的自举结合起来。

现在有困惑正常,继续看。

SARAS

第一种时间差分方法叫 SARAS,它的方程:

\[ Q_\pi(s_t, a_t) \leftarrow Q_\pi(s_t, a_t) + \eta (r_{t} + \gamma Q_\pi(s_{t+1}, a_{t+1}) - Q_\pi(s_t, a_t)) \]

和上面的公式好像一样,只是加了下标,表示策略的一致性。其中,\(a_{t+1}\) 是遵循策略 \(\pi\) 采样出来的。SARSA 算法的策略不能是「纯贪婪」,同样会采用 \(\epsilon\)-greedy 进行折中。

还是看代码更直接啊:

SARSA 代码
import numpy as np
import gym

class MazeEnv(gym.Env):
    """
    自定义迷宫环境,继承自 gym.Env
    """
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(MazeEnv, self).__init__()
        # 定义动作空间和状态空间
        # 动作空间:上、下、左、右
        self.action_space = gym.spaces.Discrete(4)
        # 状态空间:智能体在迷宫中的位置(二维坐标)
        self.maze_size = (5, 5)
        self.observation_space = gym.spaces.Box(low=0, high=4, shape=(2,), dtype=np.int32)

        # 定义迷宫(0 表示空地,-1 表示墙壁)
        self.maze = np.zeros(self.maze_size)
        self.maze[0, 3] = -1  # 墙壁位置
        self.maze[1, 1] = -1
        self.maze[1, 3] = -1
        self.maze[2, 1] = -1
        self.maze[3, 3] = -1
        self.maze[4, 1] = -1

        # 起点和终点
        self.start_pos = (0, 0)
        self.goal_pos = (0, 4)

        # 智能体初始位置
        self.agent_pos = self.start_pos

    def step(self, action):
        """
        执行动作
        """
        # 定义动作对应的移动
        directions = {
            0: (-1, 0),  # 上
            1: (1, 0),   # 下
            2: (0, -1),  # 左
            3: (0, 1)    # 右
        }
        # 根据动作计算新的位置
        move = directions[action]
        new_pos = (self.agent_pos[0] + move[0], self.agent_pos[1] + move[1])

        # 默认奖励和终止标志
        reward = -1
        done = False

        # 检查新位置是否在迷宫范围内
        if (0 <= new_pos[0] < self.maze_size[0]) and (0 <= new_pos[1] < self.maze_size[1]):
            # 检查新位置是否是墙壁
            if self.maze[new_pos] == -1:
                # 撞到墙壁
                reward = -5
            else:
                # 可以移动
                self.agent_pos = new_pos
        else:
            # 越界
            reward = -5

        # 检查是否到达终点
        if self.agent_pos == self.goal_pos:
            reward = 10
            done = True

        obs = np.array(self.agent_pos)
        info = {}
        return obs, reward, done, info

    def reset(self):
        """
        重置环境到初始状态
        """
        self.agent_pos = self.start_pos
        return np.array(self.agent_pos)

    def render(self, mode='human'):
        """
        渲染迷宫环境
        """
        maze_render = np.copy(self.maze)
        maze_render[self.agent_pos] = 2  # 智能体的位置
        maze_render[self.start_pos] = 3  # 起点
        maze_render[self.goal_pos] = 4   # 终点
        symbol_map = {
            -1: 'W',  # 墙壁
            0: ' ',   # 空地
            2: 'A',   # 智能体
            3: 'S',   # 起点
            4: 'G'    # 终点
        }
        print("\n".join(["".join([symbol_map[item] for item in row]) for row in maze_render]))
        print("\n")


def sarsa(env, num_episodes=500, alpha=0.1, gamma=0.9, epsilon=0.1):
    """
    使用SARSA算法对环境进行训练
    :param env: 训练环境
    :param num_episodes: 训练的轮数
    :param alpha: 学习率
    :param gamma: 折扣因子
    :param epsilon: epsilon-greedy 策略中的探索概率
    :return: Q表 (Q[state_row, state_col, action])
    """
    # Q表的维度:[row, col, action]
    # 这里的迷宫是5x5,动作空间是4
    Q = np.zeros((env.maze_size[0], env.maze_size[1], env.action_space.n))

    def epsilon_greedy_action(state):
        """
        在状态state下使用 epsilon-greedy 策略选择动作
        :param state: (row, col)
        :return: 动作 (int)
        """
        row, col = state
        if np.random.rand() < epsilon:
            return np.random.randint(env.action_space.n)
        else:
            return np.argmax(Q[row, col, :])

    for episode in range(num_episodes):
        # 重置环境,得到初始状态
        state = env.reset()
        state_row, state_col = state
        # 选择初始动作
        action = epsilon_greedy_action((state_row, state_col))

        done = False
        # 这里设定一个最大步数,防止某些情况下无限循环
        for _ in range(200):
            next_state, reward, done, _ = env.step(action)
            next_state_row, next_state_col = next_state

            if done:
                # 如果已经到达终点, 直接更新Q并跳出循环
                Q[state_row, state_col, action] += alpha * (reward - Q[state_row, state_col, action])
                break
            else:
                # 选择下一步动作(基于下一个状态)
                next_action = epsilon_greedy_action((next_state_row, next_state_col))
                # SARSA 更新
                Q[state_row, state_col, action] += alpha * (
                    reward + gamma * Q[next_state_row, next_state_col, next_action]
                    - Q[state_row, state_col, action]
                )
                # 状态和动作往前推进
                state_row, state_col = next_state_row, next_state_col
                action = next_action

    return Q

if __name__ == "__main__":
    env = MazeEnv()

    # 使用SARSA训练
    Q = sarsa(env, num_episodes=1000, alpha=0.1, gamma=0.9, epsilon=0.1)

    # 测试训练结果:让智能体使用贪心策略走迷宫
    state = env.reset()
    env.render()
    done = False
    total_reward = 0
    step_count = 0

    while not done and step_count < 50:
        row, col = state
        # 选取Q值最大的动作
        action = np.argmax(Q[row, col, :])
        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        step_count += 1
        state = next_state
        env.render()

    print(f"测试结束,步数: {step_count}, 总奖励: {total_reward}")

Q-Learning

Q-learning 的核心思想是通过以下公式更新 Q 值:

\[ Q_\pi(s_t, a_t) \leftarrow Q_\pi(s_t, a_t) + \eta (r_{t} + \gamma \max_{a'} Q_\pi(s_{t+1}, a_{t+1}) - Q_\pi(s_t, a_t)) \]

多了一个 max 操作,也就是这里的 \(a_{t+1}\) 是下一个状态 \(s_{t+1}\) 下,价值最大的动作。而不是完全找当前策略 \(\pi\) 而得出。

还是看代码更直观:

Q-Learning
import time
import random
import numpy as np
from maze_env import MazeEnv

def q_learning(
    env,
    num_episodes=2000,
    max_steps_per_episode=100,
    learning_rate=0.1,
    discount_rate=0.99,
    max_exploration_rate=1.0,
    min_exploration_rate=0.01,
    exploration_decay_rate=0.005,
):
    """
    使用 Q-Learning 算法对环境进行训练(off-policy)。
    :param env: 训练环境
    :param num_episodes: 训练轮数
    :param max_steps_per_episode: 每轮最大步数
    :param learning_rate: 学习率
    :param discount_rate: 折扣因子
    :param max_exploration_rate: 初始探索率上限
    :param min_exploration_rate: 最小探索率
    :param exploration_decay_rate: 探索率衰减系数
    :return: Q_table, rewards_all_episodes
    """
    state_space_size = env.maze_size
    action_space_size = env.action_space.n
    Q_table = np.zeros((state_space_size[0], state_space_size[1], action_space_size))

    def epsilon_greedy_policy(state):
        if random.uniform(0, 1) > exploration_rate:
            return np.argmax(Q_table[state[0], state[1], :])
        else:
            return env.action_space.sample()

    exploration_rate = max_exploration_rate
    rewards_all_episodes = []

    for episode in range(num_episodes):
        state = env.reset()
        rewards_current_episode = 0

        for _ in range(max_steps_per_episode):
            # 探索-利用策略(epsilon-greedy)
            action = epsilon_greedy_policy(state)

            next_state, reward, done, _ = env.step(action)

            # Q-Learning 更新:使用下一状态的最大动作价值
            old_value = Q_table[state[0], state[1], action]
            next_max = np.max(Q_table[next_state[0], next_state[1], :])
            Q_table[state[0], state[1], action] = (
                (1 - learning_rate) * old_value
                + learning_rate * (reward + discount_rate * next_max)
            )

            state = next_state
            rewards_current_episode += reward

            if done:
                break

        # 探索率指数衰减
        exploration_rate = min_exploration_rate + (
            (max_exploration_rate - min_exploration_rate)
            * np.exp(-exploration_decay_rate * episode)
        )
        rewards_all_episodes.append(rewards_current_episode)

    return Q_table, rewards_all_episodes


if __name__ == "__main__":
    env = MazeEnv()

    # 使用 Q-Learning 训练
    Q_table, rewards = q_learning(
        env,
        num_episodes=2000,
        max_steps_per_episode=100,
        learning_rate=0.1,
        discount_rate=0.99,
        max_exploration_rate=1.0,
        min_exploration_rate=0.01,
        exploration_decay_rate=0.005,
    )
    print("训练结束。\n")

    # 测试训练结果:让智能体使用贪心策略走迷宫
    state = env.reset()
    done = False
    time.sleep(1)

    for _ in range(100):
        env.render()
        time.sleep(0.2)

        action = np.argmax(Q_table[state[0], state[1], :])
        next_state, reward, done, _ = env.step(action)
        state = next_state

        if done:
            env.render()
            if reward == 10:
                print("成功到达终点!")
            else:
                print("未能到达终点。")
            break

两种方法的比较

SARSA和Q学习都采用了TD的思想,但由于策略的不同,它们的行为和学习方式也不同。Q学习的策略更新更加激进,因为它直接更新的是最优的Q值;而SARSA的策略更新更加保守,因为它只更新实际执行的Q值。

SARSA就像一个「跟班」,它只学习自己实际走过的路线。它在学习的过程中,既要探索新的路线,也要根据实际走过的路线更新自己的经验。而Q-Learning就像一个「梦想家」,它总是在想象如果选择最优路线会怎么样。它在学习的过程中,既要探索新的路线,也总是按照想象中的最优路线更新自己的经验。

TD(λ)

很显然,TD 是走了一步,那肯定有多走几步方法呗,即走了 n 步的方法,这个就不细讲了。直接看如下链接就好,不过我感觉这个太细节了,如果是掌握轮廓似乎不需要知道这个:

总结

这一章主要是讲了三种经典方法,通过这几种方法,可以算出每个 \(Q(s, a)\) 的值,这样我们可以从 \(s_0\) 开始找最优动作 \(a_0\),进入状态 \(s_1\),然后再去执行 \(a_1\),最终完成任务。

Comments