Skip to content

策略迭代和价值迭代

复制本地路径 | 在线编辑

先让我们回顾一下贝尔曼方程:

\[ V_{\pi}(s)=\sum_{a \in A} \pi(a \mid s)\left(R(s, a)+\gamma \sum_{s^{\prime} \in S} p\left(s^{\prime} \mid s, a\right) V_{\pi}\left(s^{\prime}\right)\right) \]
\[ Q_{\pi}(s, a)=R(s, a)+\gamma \sum_{s^{\prime} \in S} p\left(s^{\prime} \mid s, a\right) \sum_{a^{\prime} \in A} \pi\left(a^{\prime} \mid s^{\prime}\right) Q_{\pi}\left(s^{\prime}, a^{\prime}\right) \]

我们的目标使其最大。

\(\pi(a \mid s)\)\(p(s' \mid s,a)\) 的区别是什么?

我们按照加法为例子,现在可以加一减一,当前数字为 n 时称作状态 n。

\(\pi\) 表示状态 \(s\) 的时候取动作 \(a\) 的概率。比如状态 2 的时候,加一的概率是 0.5,减一的概率是 0.5。

\(p\) 表示状态 \(s\) 做了动作 \(a\) 之后成为状态 \(s'\) 的概率。比如状态 2 的时候,我们采取动作加一,那么变成状态 3 的概率是 1,变成状态 4 的概率是 0,当然现实情况往往都是有噪声和随机性,大部分不是这种例子下完全是 1 和 0 的概率。

动态规划

在讨论今天的主题之前,我们先介绍其中一种方法,动态规划。其实他就是迭代更新,如下所示:

\[ V_{\bold{k+1}}(s)=\sum_{a \in A} \pi(a \mid s)\left(R(s, a)+\gamma \sum_{s^{\prime} \in S} p\left(s^{\prime} \mid s, a\right) V_{\bold{k}}\left(s^{\prime}\right)\right) \]

其中 \(V_{k}\) 是第 k 次迭代时状态 s 的价值函数估计,所以只是把 \(V\) 的下标给改了。这么说也可以,但意义其实发生了很大的变化。

这样不断迭代,直到最终收敛。有严格证明这样是能收敛的,但是太过学术和数学化,这里不展开讨论,可以去下方链接的最后查看:

看代码理解最好:

动态规划
import numpy as np
import gym
from gym import spaces

class MazeEnv(gym.Env):
    """
    自定义迷宫环境,继承自 gym.Env
    """
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super(MazeEnv, self).__init__()
        # 定义动作空间和状态空间
        self.action_space = spaces.Discrete(4)
        self.maze_size = (5, 5)
        self.observation_space = spaces.Box(low=0, high=4, shape=(2,), dtype=np.int32)

        # 定义迷宫(0 表示空地,-1 表示墙壁)
        self.maze = np.zeros(self.maze_size)
        self.maze[0, 3] = -1  # 墙壁位置
        self.maze[1, 1] = -1
        self.maze[1, 3] = -1
        self.maze[2, 1] = -1
        self.maze[3, 3] = -1
        self.maze[4, 1] = -1

        # 起点和终点
        self.start_pos = (0, 0)
        self.goal_pos = (0, 4)

        # 智能体初始位置
        self.agent_pos = self.start_pos

    def step(self, action):
        directions = {
            0: (-1, 0),  # 上
            1: (1, 0),   # 下
            2: (0, -1),  # 左
            3: (0, 1)    # 右
        }

        move = directions[action]
        new_pos = (self.agent_pos[0] + move[0], self.agent_pos[1] + move[1])

        # 检查新位置是否在迷宫范围内
        if (0 <= new_pos[0] < self.maze_size[0]) and (0 <= new_pos[1] < self.maze_size[1]):
            # 检查新位置是否是墙壁
            if self.maze[new_pos] != -1:
                self.agent_pos = new_pos  # 更新位置
                reward = -1
                done = False
            else:
                # 撞到墙壁
                reward = -5
                done = False
        else:
            # 越界
            reward = -5
            done = False

        # 检查是否到达终点
        if self.agent_pos == self.goal_pos:
            reward = 10
            done = True

        obs = np.array(self.agent_pos)
        info = {}
        return obs, reward, done, info

    def reset(self):
        self.agent_pos = self.start_pos
        return np.array(self.agent_pos)

    def render(self, mode='human'):
        maze_render = np.copy(self.maze)
        maze_render[self.agent_pos] = 2  # 智能体
        maze_render[self.start_pos] = 3  # 起点
        maze_render[self.goal_pos] = 4   # 终点
        symbol_map = {
            -1: 'W',  # 墙壁
            0: ' ',
            2: 'A',   # 智能体
            3: 'S',   # 起点
            4: 'G'    # 终点
        }
        print("\n".join(["".join([symbol_map[item] for item in row]) for row in maze_render]))
        print()

def policy_iteration(env, gamma=0.9, theta=1e-5, max_iter=1000):
    """
    使用策略迭代求解迷宫。
    gamma: 折扣因子(小于1,避免负循环)
    theta: 收敛阈值
    max_iter: 策略迭代的最大迭代次数,防止死循环
    """

    # 1) 收集所有非墙壁状态
    states = []
    for r in range(env.maze_size[0]):
        for c in range(env.maze_size[1]):
            if env.maze[r, c] != -1: 
                states.append((r, c))

    actions = [0, 1, 2, 3]  # 上、下、左、右

    def step_in_model(state, action):
        # 终点不需要再动
        if state == env.goal_pos:
            return state, 0.0, True

        directions = {
            0: (-1, 0),
            1: (1, 0),
            2: (0, -1),
            3: (0, 1)
        }
        move = directions[action]
        new_state = (state[0] + move[0], state[1] + move[1])

        if not (0 <= new_state[0] < env.maze_size[0] and 0 <= new_state[1] < env.maze_size[1]):
            # 越界
            return state, -5, False
        if env.maze[new_state] == -1:
            # 撞墙
            return state, -5, False

        # 正常移动
        reward = -1
        done = False
        if new_state == env.goal_pos:
            reward = 10
            done = True
        return new_state, reward, done

    # 2) 初始化策略、价值函数
    pi = {}
    V = {}
    for s in states:
        if s == env.goal_pos:
            pi[s] = None
            V[s] = 0.0
        else:
            pi[s] = np.random.choice(actions)
            V[s] = 0.0

    # 3) 策略迭代
    iter_count = 0
    while True:
        iter_count += 1
        if iter_count > max_iter:
            print("超过最大迭代次数,提前退出,可能未完全收敛。")
            break

        # ========== (A) 策略评估 ==========
        while True:
            delta = 0
            for s in states:
                if s == env.goal_pos:
                    continue
                v_old = V[s]
                a = pi[s]
                s_next, r, done = step_in_model(s, a)
                if done:
                    V[s] = r
                else:
                    V[s] = r + gamma * V[s_next]
                delta = max(delta, abs(V[s] - v_old))
            if delta < theta:
                break

        # ========== (B) 策略改进 ==========
        policy_stable = True
        for s in states:
            if s == env.goal_pos:
                continue
            old_a = pi[s]

            best_a = None
            best_q = float('-inf')
            for a in actions:
                s_next, r, done = step_in_model(s, a)
                q_sa = r if done else (r + gamma * V[s_next])
                if q_sa > best_q:
                    best_q = q_sa
                    best_a = a

            pi[s] = best_a
            if best_a != old_a:
                policy_stable = False

        if policy_stable:
            print(f"策略在迭代 {iter_count} 次后稳定。")
            break

    return pi, V

if __name__ == "__main__":
    env = MazeEnv()
    policy, value = policy_iteration(env, gamma=0.9, theta=1e-5, max_iter=200)

    # 查看价值函数与策略
    print("最终价值函数(部分):")
    for s in sorted(value.keys()):
        print(f"State {s}: V = {value[s]:.2f}")

    print("\n最终策略:")
    action_dict = {0: '↑', 1: '↓', 2: '←', 3: '→'}
    for s in sorted(policy.keys()):
        if policy[s] is None:
            print(f"{s} -> 终点")
        else:
            print(f"{s} -> {action_dict[policy[s]]}")

    # 用最终策略跑一遍环境
    obs = env.reset()
    env.render()
    done = False
    step_count = 0
    while not done and step_count < 50:
        s = tuple(obs)
        action = policy[s]
        obs, reward, done, _ = env.step(action)
        env.render()
        step_count += 1

    print("Episode finished!")

策略迭代和价值迭代

现在开始正式进入主题。上面,我们已经找到了一种「迭代式」计算价值函数的方法,但一定要注意,这有一个容易被忽略的前提,就是价值函数一定是在「某个策略」之下的,也就是说:上面的方法,只解决了「给定某个策略,我能计算出这种策略下的各状态价值函数」。

但还记得我们的初心吗?RL 最终的目标,就是找到一个最优策略啊:我们求价值函数,最终是为了找最优策略;而求解价值函数,前提又是有对应的策略……这不是鸡生蛋、蛋生鸡么?

下面就开始讨论两种真正的方式,我不做详细解释,请自行理解区别。

策略迭代

策略迭代的思想:在策略和价值函数之间来回迭代,逐步改进策略,直到收敛到最优策略。比较抽象,还是要看下面的具体步骤。两个步骤不断循环:

  1. 策略评估

在给定一个策略 \(\pi\)(初始可能就是一个随机策略)的情况下,计算该策略下各个状态的价值函数 \(V_{\pi}(s)\)\(Q_{\pi}(s, a)\)。方法就可以用上面的动态规划来做。

  1. 策略控制

上一步求解出的价值函数 \(V_{\pi}\),进行对策略的「贪婪化」,得到一个新的策略 \(\pi'\)

\[ \pi'(s)=\underset{a \in A}{\arg \max }~ Q_{\pi}(s, a) \]

这样不断迭代,直到最终收敛。这种算法的收敛性在于,每次策略改进都会使得策略变得不劣于之前的策略(即 \(V_{\pi'}(s) \geq V_{\pi}(s)\) 对所有状态 s 成立)。由于状态空间是有限的,策略的改进过程不可能无限进行,因此算法一定会在有限步内收敛到最优策略。

价值迭代

而价值迭代法融合了策略评估和改进步骤,直接迭代更新价值函数来逼近最优价值函数。

这里不细谈了,感觉太细节了,我的目的是了解轮廓而不是抠细节。总之理解好说,在策略未知的情况下,我们可以逐步迭代,最终算出最佳的策略和其对应的价值。

重新理解 \(\pi(a \mid s)\)

在上一章包括开头,我对 \(\pi(a \mid s)\) 的解释是:某一个状态应该采取什么样的动作概率。 的确是这样,但是例子举得不好,用的是天然的例子。比如加一减一,状态 n 的时候,概率分别是 0.5,这是天然的。

例子中我们没有一个目标,这个概率也是自然规律。而事实上任务都会有一个目标,这个 \(\pi(a \mid s)\) 是我们需要算出来的,也就是说不同策略下可能不一样,即 \(\pi_1(a \mid s)\)\(\pi_2(a \mid s)\) 可能不一样。

比如自动驾驶,有几种操作:加速、减速、匀速。一开始的策略中,我们可能认为这几种操作的概率是一样的。假设状态 s 表示当前道路状况良好,则:

\[ \pi(匀速 \mid s) = 0.33 \\ \pi(加速 \mid s) = 0.33 \\ \pi(减速 \mid s) = 0.33 \]

而经过上面所说的策略迭代和价值迭代,我们就会得出:道路状况良好的时候,匀速开就好:

\[ \pi_2(匀速 \mid s) = 0.8 \\ \pi_2(加速 \mid s) = 0.1 \\ \pi_2(减速 \mid s) = 0.1 \]

最终我们要采取的动作是匀速,即:

\[ a^{*} = \argmax_{a} \pi(a | s) \]

所以一个重点是:\(\pi(a | s)\) 输出的是一个概率!比如上面例子中,道路状况良好不代表就一定百分百是匀速,输出的始终是一个概率。只是最终选择的是概率最大的动作。

Comments