策略迭代和价值迭代
先让我们回顾一下贝尔曼方程:
我们的目标使其最大。
\(\pi(a \mid s)\) 和 \(p(s' \mid s,a)\) 的区别是什么?
我们按照加法为例子,现在可以加一减一,当前数字为 n 时称作状态 n。
\(\pi\) 表示状态 \(s\) 的时候取动作 \(a\) 的概率。比如状态 2 的时候,加一的概率是 0.5,减一的概率是 0.5。
\(p\) 表示状态 \(s\) 做了动作 \(a\) 之后成为状态 \(s'\) 的概率。比如状态 2 的时候,我们采取动作加一,那么变成状态 3 的概率是 1,变成状态 4 的概率是 0,当然现实情况往往都是有噪声和随机性,大部分不是这种例子下完全是 1 和 0 的概率。
动态规划
在讨论今天的主题之前,我们先介绍其中一种方法,动态规划。其实他就是迭代更新,如下所示:
其中 \(V_{k}\) 是第 k 次迭代时状态 s 的价值函数估计,所以只是把 \(V\) 的下标给改了。这么说也可以,但意义其实发生了很大的变化。
这样不断迭代,直到最终收敛。有严格证明这样是能收敛的,但是太过学术和数学化,这里不展开讨论,可以去下方链接的最后查看:
看代码理解最好:
动态规划
import numpy as np
import gym
from gym import spaces
class MazeEnv(gym.Env):
"""
自定义迷宫环境,继承自 gym.Env
"""
metadata = {'render.modes': ['human']}
def __init__(self):
super(MazeEnv, self).__init__()
# 定义动作空间和状态空间
self.action_space = spaces.Discrete(4)
self.maze_size = (5, 5)
self.observation_space = spaces.Box(low=0, high=4, shape=(2,), dtype=np.int32)
# 定义迷宫(0 表示空地,-1 表示墙壁)
self.maze = np.zeros(self.maze_size)
self.maze[0, 3] = -1 # 墙壁位置
self.maze[1, 1] = -1
self.maze[1, 3] = -1
self.maze[2, 1] = -1
self.maze[3, 3] = -1
self.maze[4, 1] = -1
# 起点和终点
self.start_pos = (0, 0)
self.goal_pos = (0, 4)
# 智能体初始位置
self.agent_pos = self.start_pos
def step(self, action):
directions = {
0: (-1, 0), # 上
1: (1, 0), # 下
2: (0, -1), # 左
3: (0, 1) # 右
}
move = directions[action]
new_pos = (self.agent_pos[0] + move[0], self.agent_pos[1] + move[1])
# 检查新位置是否在迷宫范围内
if (0 <= new_pos[0] < self.maze_size[0]) and (0 <= new_pos[1] < self.maze_size[1]):
# 检查新位置是否是墙壁
if self.maze[new_pos] != -1:
self.agent_pos = new_pos # 更新位置
reward = -1
done = False
else:
# 撞到墙壁
reward = -5
done = False
else:
# 越界
reward = -5
done = False
# 检查是否到达终点
if self.agent_pos == self.goal_pos:
reward = 10
done = True
obs = np.array(self.agent_pos)
info = {}
return obs, reward, done, info
def reset(self):
self.agent_pos = self.start_pos
return np.array(self.agent_pos)
def render(self, mode='human'):
maze_render = np.copy(self.maze)
maze_render[self.agent_pos] = 2 # 智能体
maze_render[self.start_pos] = 3 # 起点
maze_render[self.goal_pos] = 4 # 终点
symbol_map = {
-1: 'W', # 墙壁
0: ' ',
2: 'A', # 智能体
3: 'S', # 起点
4: 'G' # 终点
}
print("\n".join(["".join([symbol_map[item] for item in row]) for row in maze_render]))
print()
def policy_iteration(env, gamma=0.9, theta=1e-5, max_iter=1000):
"""
使用策略迭代求解迷宫。
gamma: 折扣因子(小于1,避免负循环)
theta: 收敛阈值
max_iter: 策略迭代的最大迭代次数,防止死循环
"""
# 1) 收集所有非墙壁状态
states = []
for r in range(env.maze_size[0]):
for c in range(env.maze_size[1]):
if env.maze[r, c] != -1:
states.append((r, c))
actions = [0, 1, 2, 3] # 上、下、左、右
def step_in_model(state, action):
# 终点不需要再动
if state == env.goal_pos:
return state, 0.0, True
directions = {
0: (-1, 0),
1: (1, 0),
2: (0, -1),
3: (0, 1)
}
move = directions[action]
new_state = (state[0] + move[0], state[1] + move[1])
if not (0 <= new_state[0] < env.maze_size[0] and 0 <= new_state[1] < env.maze_size[1]):
# 越界
return state, -5, False
if env.maze[new_state] == -1:
# 撞墙
return state, -5, False
# 正常移动
reward = -1
done = False
if new_state == env.goal_pos:
reward = 10
done = True
return new_state, reward, done
# 2) 初始化策略、价值函数
pi = {}
V = {}
for s in states:
if s == env.goal_pos:
pi[s] = None
V[s] = 0.0
else:
pi[s] = np.random.choice(actions)
V[s] = 0.0
# 3) 策略迭代
iter_count = 0
while True:
iter_count += 1
if iter_count > max_iter:
print("超过最大迭代次数,提前退出,可能未完全收敛。")
break
# ========== (A) 策略评估 ==========
while True:
delta = 0
for s in states:
if s == env.goal_pos:
continue
v_old = V[s]
a = pi[s]
s_next, r, done = step_in_model(s, a)
if done:
V[s] = r
else:
V[s] = r + gamma * V[s_next]
delta = max(delta, abs(V[s] - v_old))
if delta < theta:
break
# ========== (B) 策略改进 ==========
policy_stable = True
for s in states:
if s == env.goal_pos:
continue
old_a = pi[s]
best_a = None
best_q = float('-inf')
for a in actions:
s_next, r, done = step_in_model(s, a)
q_sa = r if done else (r + gamma * V[s_next])
if q_sa > best_q:
best_q = q_sa
best_a = a
pi[s] = best_a
if best_a != old_a:
policy_stable = False
if policy_stable:
print(f"策略在迭代 {iter_count} 次后稳定。")
break
return pi, V
if __name__ == "__main__":
env = MazeEnv()
policy, value = policy_iteration(env, gamma=0.9, theta=1e-5, max_iter=200)
# 查看价值函数与策略
print("最终价值函数(部分):")
for s in sorted(value.keys()):
print(f"State {s}: V = {value[s]:.2f}")
print("\n最终策略:")
action_dict = {0: '↑', 1: '↓', 2: '←', 3: '→'}
for s in sorted(policy.keys()):
if policy[s] is None:
print(f"{s} -> 终点")
else:
print(f"{s} -> {action_dict[policy[s]]}")
# 用最终策略跑一遍环境
obs = env.reset()
env.render()
done = False
step_count = 0
while not done and step_count < 50:
s = tuple(obs)
action = policy[s]
obs, reward, done, _ = env.step(action)
env.render()
step_count += 1
print("Episode finished!")
策略迭代和价值迭代
现在开始正式进入主题。上面,我们已经找到了一种「迭代式」计算价值函数的方法,但一定要注意,这有一个容易被忽略的前提,就是价值函数一定是在「某个策略」之下的,也就是说:上面的方法,只解决了「给定某个策略,我能计算出这种策略下的各状态价值函数」。
但还记得我们的初心吗?RL 最终的目标,就是找到一个最优策略啊:我们求价值函数,最终是为了找最优策略;而求解价值函数,前提又是有对应的策略……这不是鸡生蛋、蛋生鸡么?
下面就开始讨论两种真正的方式,我不做详细解释,请自行理解区别。
策略迭代
策略迭代的思想:在策略和价值函数之间来回迭代,逐步改进策略,直到收敛到最优策略。比较抽象,还是要看下面的具体步骤。两个步骤不断循环:
- 策略评估
在给定一个策略 \(\pi\)(初始可能就是一个随机策略)的情况下,计算该策略下各个状态的价值函数 \(V_{\pi}(s)\) 和 \(Q_{\pi}(s, a)\)。方法就可以用上面的动态规划来做。
- 策略控制
上一步求解出的价值函数 \(V_{\pi}\),进行对策略的「贪婪化」,得到一个新的策略 \(\pi'\)。
这样不断迭代,直到最终收敛。这种算法的收敛性在于,每次策略改进都会使得策略变得不劣于之前的策略(即 \(V_{\pi'}(s) \geq V_{\pi}(s)\) 对所有状态 s 成立)。由于状态空间是有限的,策略的改进过程不可能无限进行,因此算法一定会在有限步内收敛到最优策略。
价值迭代
而价值迭代法融合了策略评估和改进步骤,直接迭代更新价值函数来逼近最优价值函数。
这里不细谈了,感觉太细节了,我的目的是了解轮廓而不是抠细节。总之理解好说,在策略未知的情况下,我们可以逐步迭代,最终算出最佳的策略和其对应的价值。
重新理解 \(\pi(a \mid s)\)
在上一章包括开头,我对 \(\pi(a \mid s)\) 的解释是:某一个状态应该采取什么样的动作概率。 的确是这样,但是例子举得不好,用的是天然的例子。比如加一减一,状态 n 的时候,概率分别是 0.5,这是天然的。
例子中我们没有一个目标,这个概率也是自然规律。而事实上任务都会有一个目标,这个 \(\pi(a \mid s)\) 是我们需要算出来的,也就是说不同策略下可能不一样,即 \(\pi_1(a \mid s)\) 和 \(\pi_2(a \mid s)\) 可能不一样。
比如自动驾驶,有几种操作:加速、减速、匀速。一开始的策略中,我们可能认为这几种操作的概率是一样的。假设状态 s 表示当前道路状况良好,则:
而经过上面所说的策略迭代和价值迭代,我们就会得出:道路状况良好的时候,匀速开就好:
最终我们要采取的动作是匀速,即:
所以一个重点是:\(\pi(a | s)\) 输出的是一个概率!比如上面例子中,道路状况良好不代表就一定百分百是匀速,输出的始终是一个概率。只是最终选择的是概率最大的动作。