-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpseudocode.py
More file actions
137 lines (105 loc) · 6.82 KB
/
pseudocode.py
File metadata and controls
137 lines (105 loc) · 6.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
QPPONet: Hybrid DQN-PPO Reinforcement Learning Agent for Battery Management
Algorithm pseudocode for the QPPONet agent proposed in:
"Optimizing Charge Discharge Cycles Using QPPONet-Enabled Hybrid Learning
Framework for Energy Management and Safety in Electric Vehicles"
MV Sujan Kumar, Ganesh Khekare
Accepted, Elsevier Energy Storage (in press)
NOTE: This is algorithmic pseudocode describing the method.
The full implementation (bms.py) will be released upon journal publication.
"""
# ── Hyperparameters ────────────────────────────────────────────────────────────
alpha = 0.6 # power loss penalty weight
beta = 0.3 # energy efficiency reward weight
gamma = 0.1 # SOH degradation penalty weight
DQN_lr = 1e-4
PPO_actor_lr = 3e-4
PPO_critic_lr = 3e-4
clip_eps = 0.2
entropy_coef = 0.01
value_loss_coef = 0.5
batch_size = 64
replay_capacity = 200_000
target_update_freq = 1_000
ppo_epochs = 4
ppo_minibatch = 64
lambda_q = 0.01 # Q-value blend weight in hybrid advantage
random_seeds = [s1, s2, s3, s4, s5]
# ── Network Initialization ─────────────────────────────────────────────────────
Initialize DQN_Q network (weights θ_Q)
Initialize DQN target network θ_Q_target ← θ_Q
Initialize PPO actor network (parameters θ_actor)
Initialize PPO critic network (parameters θ_critic)
Initialize replay_buffer (capacity = replay_capacity)
# ── Reward Function ────────────────────────────────────────────────────────────
def compute_reward(state, action, next_state):
"""
Multi-objective reward coupling the RL agent to the SOH estimator module.
SOH_degradation is computed via the pre-trained Gradient Boosting estimator,
not a fixed heuristic — this is the core integration mechanism.
"""
P_loss = compute_power_loss(state, action, next_state)
E_efficiency = compute_efficiency(state, action, next_state)
SOH_degradation = estimate_soh_degradation(state, action, next_state)
reward = beta * E_efficiency - (alpha * SOH_degradation) - (gamma * P_loss)
return reward
# ── Training Loop ──────────────────────────────────────────────────────────────
for seed in random_seeds:
set_global_seed(seed)
reset networks and replay_buffer
total_steps = 0
for episode in 1..N_episodes:
state = env.reset()
episode_buffer = []
done = False
while not done:
# ── Action selection (PPO actor) ───────────────────────────────────
action, logp = sample_action_from_actor(θ_actor, state)
# ── Environment step ───────────────────────────────────────────────
next_state, env_reward_components, done, info = env.step(action)
reward = compute_reward(state, action, next_state)
# ── Store transitions ──────────────────────────────────────────────
replay_buffer.add(state, action, reward, next_state, done)
episode_buffer.append((state, action, reward, logp, next_state, done))
state = next_state
total_steps += 1
# ── DQN update (off-policy, per step) ─────────────────────────────
if replay_buffer.size() > batch_size:
batch = replay_buffer.sample(batch_size)
target_q = reward_batch + gamma_discount * max_a_prime Q_target(next_state_batch, a')
loss_Q = MSE(Q(state_batch, action_batch; θ_Q), target_q)
θ_Q ← θ_Q - DQN_lr * ∇_{θ_Q} loss_Q
if total_steps % target_update_freq == 0:
θ_Q_target ← θ_Q
# ── PPO update (on-policy, per rollout) ───────────────────────────
if len(episode_buffer) >= PPO_rollout_length or done:
states, actions, rewards, logps, next_states, dones = unzip(episode_buffer)
returns = compute_discounted_returns(rewards, dones,
last_value=V(next_states[-1], θ_critic))
advantages = returns - V(states, θ_critic)
# ── Hybrid advantage: blend PPO advantage with DQN Q-values ──
q_values = Q(states, actions; θ_Q)
hybrid_advantages = advantages + lambda_q * (q_values - V(states, θ_critic))
# ── PPO epochs ────────────────────────────────────────────────
for epoch in 1..ppo_epochs:
for minibatch in sample_minibatches(states, actions, logps,
returns, hybrid_advantages,
size=ppo_minibatch):
s_mb, a_mb, old_logp_mb, ret_mb, adv_mb = minibatch
# Actor loss (clipped surrogate objective)
new_logp = actor_logprob(θ_actor, s_mb, a_mb)
ratio = exp(new_logp - old_logp_mb)
clipped_ratio = clip(ratio, 1 - clip_eps, 1 + clip_eps)
actor_loss = -mean(min(ratio * adv_mb, clipped_ratio * adv_mb))
# Critic loss
value_pred = V(s_mb, θ_critic)
critic_loss = value_loss_coef * MSE(value_pred, ret_mb)
# Entropy bonus
entropy = mean(actor_entropy(θ_actor, s_mb))
loss = actor_loss + critic_loss - entropy_coef * entropy
# Parameter updates
θ_actor ← θ_actor - PPO_actor_lr * ∇_{θ_actor} (actor_loss - entropy_coef * entropy)
θ_critic ← θ_critic - PPO_critic_lr * ∇_{θ_critic} (critic_loss)
episode_buffer = []
log_episode_metrics(episode, seed, total_reward, capacity_loss, policy_stats)
save_model_and_run_stats(seed)