Simple MPC
MPC에 대해 글을 써볼까.
0.
회사에서 제어를 위한 툴로서 강화학습과 MPC를 사용한다. 둘 모두 실시간 제어 즉 온라인 방식을 구현해내는 것은 쉬운 일이 아니지만 이것들을 제한적으로 사용하거나 오프라인 방식으로 활용하는 방안은 충분히 생각해볼 법하다.
작년 말에, 나의 실친이자 직장 선배인 K 책임님이 MPC 기능을 구현해놓았다. 나는 MPC가 무엇인지 공부할 엄두가 나지 않아 계속 미루고 있다가 최근에야 클로드와의 대화를 통해 MPC를 이해하게 되었다. 지극히 간단한 예제로 MPC를 코드로 구현해놓았으니 나는 이걸 simple MPC라고 부르겠다. 목적함수를 만들 때에도 군더더기 없이 MPC의 핵심 term만 남겨서 서술하려고 한다. 이것을 MPC라고 불러도 되는지는 잘 모르겠으나 어쨌든 MPC의 원리를 이해하기 위한 입문용으로는 괜찮지 않을까 하는 생각으로 글을 쓴다.
1. 모터 환경
모터에 전압을 가해 차의 속력을 변화시키는 문제를 생각하자. $s_t$를 $t$시점의 속력이라고 하고(speed) $v_t$를 $t$시점에서의 전압이라고 하자(voltage). $s_t$는 상태변수이자 목표변수의 역할을 하고 $v_t$는 제어변수가 된다. $t$는 이산적으로, 등간격의 시간만을 생각하자. 다시 말해, 일종의 decision process를 생각하는 것이다. 시스템 동역학은
\[s_{t+1}=0.8s_t+30v_t\]로 주어진다고 하자. 즉, 속력은 이전 속력에 의한 관성의 영향을 받고 거기에 모터에 걸려진 전압에 비례하게 속도가 상승한다. 이것은 선형적인 동역학이다.
이 모터 환경은 다음과 같은 Motor 클래스로 구현된다.
1
2
3
4
5
6
7
8
9
10
11
class Motor:
"""간단한 모터 시뮬레이터"""
def __init__(self):
self.speed = 0.0
def apply_voltage(self, voltage, noise_std=1.0):
"""전압을 주면 속도가 변함"""
# 간단한 식: 새속도 = 현재속도 * 0.8 + 전압 * 30 + noise
noise = np.random.normal(0, noise_std)
self.speed = self.speed + voltage * 30 - self.speed * 0.2 + noise
return self.speed
2. 데이터 생성
이 시스템 동역학은 알려져 있지 않다고 가정하자. 대신 다음과 같은 데이터가 주어져있다고 하자.
\[\mathcal D = \{(s_t,v_t,s_{t+1}):0\le t\lt200\}\]실제로는 $0\le t\lt 200$인 $t$에 대하여 전압 $v_t$를 0과 10 사이의 임의의 실수로 부여하는 식으로 가상의 데이터를 생성했다 ($s_0=0$)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
motor = Motor()
data = []
for i in range(200):
# 현재 속도
current_speed = motor.speed
# 랜덤 전압 (0~10V)
voltage = np.random.uniform(0, 10)
# 모터에 전압 주기
next_speed = motor.apply_voltage(voltage)
# 기록
data.append({
'current_speed': current_speed,
'voltage': voltage,
'next_speed': next_speed
})
df = pd.DataFrame(data).round(3)
df

위 코드에서 데이터 $\mathcal D$는 df로 표현되었다.
시간에 따른 속력과 전압을 나타내면 다음과 같이 나타난다.
전압이 높아지면 다음 순간에 속력이 높아짐을 확인할 수 있다.
(시인성을 위해 전압은 40을 곱하여 시각화했다.)
1
2
3
4
5
plt.figure(figsize=(15,3))
plt.plot(df['current_speed'], label='current_speed')
plt.plot(df['voltage']*40, label='voltage')
plt.legend()
plt.show()

3. 시스템 동역학 학습 (System Identification)
MPC를 위해 먼저 하는 것은 시스템 동역학을 학습하는 것이다. 즉
\[s_{t+1}=f(s_t,v_t)\]인 함수 $f$를 모델링하는 것이다.
만약 $f$가 일차함수면 이 MPC를 linear MPC라고 부르고 $f$가 신경망처럼 비선형함수면 nonlinear MPC라고 부르게 된다. 지금 이 포스트에서는 linear한 경우를 다루도록 하자. 아까 생성한 데이터를 통해 선형식으로 모델링하면
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def train_model(data):
"""데이터로 모델 학습"""
print("모델 학습 중...")
# X: [현재속도, 전압]
X = data[['current_speed', 'voltage']].values
# y: 다음속도
y = data['next_speed'].values
# 선형회귀로 학습
model = LinearRegression()
model.fit(X, y)
print("✅ 학습 완료!")
return model
model = train_model(df)
print(model.coef_.round(8), round(model.intercept_,8))
[ 0.80019128 29.95026486] 0.10796784
당연하게도 처음 가정한 시스템 동역학 식과 거의 일치하는 식이 도출된다.
\[s_{t+1}=f(s_t,v_t)=0.80019128 s_t+29.95026486v_t+0.10796784\]이 모델은 데이터로부터 우리가 추론해낸 시스템 동역학이다.
4. MPC 제어 (one step)
이제 MPC의 핵심부분에 대해 알아보자.
목표변수의 목표값을 1500으로, horizon을 10으로 두어 one step에 대하여만 MPC 제어를 시행한다고 해보자. 이것은, 향후 10번의 timestep동안 목표변수의 예측값 $\hat s_1$, $\cdots$, $\hat s_{10}$이 최대한 1500에 가깝도록하려면 10번동안의 제어변수값 $\hat v_0$, $\cdots$, $\hat v_9$이 얼마여야 하는지 결정하는 것을 말한다. 목표변수의 예측값들은 벡터 $\hat s=\left(\hat s_1,\cdots,\hat s_{10}\right)$로 표현하고 제어변수값들도 벡터 $v=\left(v_0,\cdots,v_9\right)$로 표현하자.
목표변수값(속력)들은 동역학식 $f$을 순차적으로 적용하여 예측된 속도이다 ; $\hat s_0=s_0$, $\hat s_{t+1}=f(\hat s_t,v_t)$ $(t=1,\cdots,9)$.
이것은 함수 predict_future로 구현된다.
이렇게 얻어진 속도들을 사용하여 목표함수를 다음과 같이 정할 수 있으며
\[\begin{align*} J(v) &=\sum_{t=1}^{10}\left(\hat s_t-1500\right)^2\\ &=\sum_{t=1}^{10}\left(f(\hat s_{t-1},v_{t-1})-1500\right)^2\\ \end{align*}\]이것은 함수 cost로 구현된다.
이 두 함수를 바탕으로 $J(v)$를 최소로 하는 $v$를 얻어낸다.
여기서 $J(v)$는 복잡할지언정 2차함수(quadratic function)에 속하므로 이 최적화 문제는 볼록 최적화(convex optimization)에 해당한다.
따라서 비교적 쉽게 풀리는 최적화 문제이다.
범용적이나 계산량이 많은 SLSQP로 이 문제를 풀 수도 있지만 여기서는 scipy.optimize.minimize의 기본 솔버인 L-BFGS-B를 사용하였다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
motor = Motor()
target_speed = 1500.0
current_speed = motor.speed
def predict_future(voltage_plan):
"""미래 10스텝 예측"""
speed = current_speed
speeds = []
for v in voltage_plan:
# 모델로 다음 속도 예측
speed = model.predict([[speed, v]])[0]
speeds.append(speed)
return speeds
def cost(voltage_plan):
"""비용 함수: 목표와 얼마나 차이나는지"""
future_speeds = predict_future(voltage_plan)
error = sum((s - target_speed)**2 for s in future_speeds)
return error
# 최적화: 10스텝 전압 계획
initial_plan = [5.0] * 10 # 초기값: 5V로 10번
bounds = [(0, 30)] * 10 # 전압 범위: 0~30V
result = minimize(cost, initial_plan, bounds=bounds)
print(result.x)
[30. 26.07073943 10.00339915 10.00324293 10.00360237 10.0034147
10.00345973 10.00324613 10.00364791 10.00353239]
그 결과로 얻어지는 $v^\ast=\text{argmin}_vJ(v)=(v^\ast_0,\cdots,v^\ast_9)$를 관찰하면
\[\begin{align*} v^\ast_0&\approx30\\ v^\ast_1&\approx26\\ v^\ast_2&\approx10\\ v^\ast_3&\approx10\\ &\vdots\\ v^\ast_9&\approx10\\ \end{align*}\]와 같다. 즉 처음에는 전압을 최대(30V)로 올려서 속도를 상승시킨 후 특정시점(t=2)부터는 전압을 10V로 주어 목표변수를 목표값에 수렴시킨다.
$v^\ast_2=v^\ast_3=\cdots=v^\ast_9\approx10$인 것은 동역학식
\[s_{t+1}=0.8s_t+30v_t\]의 양변에 $\lim_{t\to\infty}$를 씌워서 정상상태의 $s$, $v$ 사이의 관계식을 다음과 같이 구했을 때
\[\begin{align*} \lim_{t\to\infty}s_{t+1}&=0.8\lim_{t\to\infty}s_t+30\lim_{t\to\infty}v_t\\ s&=0.8s+30v\\ 0.2s&=30v\\ \end{align*}\]$s=1500$을 대입해 얻어지는 값 $v=10$과 정확히 일치한다.
이렇게, 향후 10번의 목표값을 맞추기 위한 전압값을 10개 추천했다. 하지만 horizon을 10으로 설정한 것은 단지 멀리 보기 위함이고 더 중요한 것은 당장 다음 스텝에서 얼마의 전압값(제어변수값)을 쓸 것이냐 하는 것이다 : $v^\ast_0=30$.
1
2
best_voltage = result.x[0]
print(best_voltage)
30.0
댓글남기기