Skip to content

Commit 625c04c

Browse files
committed
agent: fix decrypted HMAC keys in proxy_connectors and controlplane.rs
1 parent 2f7f986 commit 625c04c

File tree

4 files changed

+26
-34
lines changed

4 files changed

+26
-34
lines changed

crates/agent-sql/src/data_plane.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub async fn fetch_data_planes<'a, 'b>(
5656
ops_logs_name as "ops_logs_name: models::Collection",
5757
ops_stats_name as "ops_stats_name: models::Collection"
5858
from data_planes
59-
where (array_length($1::flowid[], 1) = 0 OR id in (select id from unnest($1::flowid[]) id))
59+
where (array_length($1::flowid[], 1) is null or id in (select id from unnest($1::flowid[]) id))
6060
or data_plane_name = $2
6161
"#,
6262
&data_plane_ids as &[Id],

crates/agent/src/api/snapshot.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,16 @@ async fn try_fetch(
540540
"fetched authorization snapshot",
541541
);
542542

543+
data_planes
544+
.iter_mut()
545+
.filter(|dp| decrypted_hmac_keys.contains_key(&dp.data_plane_name))
546+
.for_each(|dp| {
547+
dp.hmac_keys = decrypted_hmac_keys
548+
.get(&dp.data_plane_name)
549+
.unwrap()
550+
.clone()
551+
});
552+
543553
futures::future::try_join_all(
544554
data_planes
545555
.iter_mut()

crates/agent/src/main.rs

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -213,30 +213,7 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
213213
connectors,
214214
);
215215

216-
let mut data_planes =
217-
agent_sql::data_plane::fetch_data_planes(&pg_pool, Vec::new(), "", uuid::Uuid::nil())
218-
.await?;
219-
220-
futures::future::try_join_all(
221-
data_planes
222-
.iter_mut()
223-
.filter(|dp| {
224-
!dp.encrypted_hmac_keys
225-
.to_value()
226-
.as_object()
227-
.unwrap()
228-
.is_empty()
229-
})
230-
.map(|dp| agent::decrypt_hmac_keys(dp)),
231-
)
232-
.await?;
233-
234-
let decrypted_hmac_keys = Arc::new(RwLock::new(
235-
data_planes
236-
.iter()
237-
.map(|dp| (dp.data_plane_name.clone(), dp.hmac_keys.clone()))
238-
.collect(),
239-
));
216+
let decrypted_hmac_keys = Arc::new(RwLock::new(HashMap::new()));
240217

241218
tokio::spawn(refresh_decrypted_hmac_keys(
242219
pg_pool.clone(),
@@ -307,13 +284,10 @@ async fn refresh_decrypted_hmac_keys(
307284
const REFRESH_INTERVAL: chrono::TimeDelta = chrono::TimeDelta::seconds(60);
308285

309286
loop {
310-
let mut data_planes =
287+
let mut data_planes: Vec<_> =
311288
agent_sql::data_plane::fetch_data_planes(&pg_pool, Vec::new(), "", uuid::Uuid::nil())
312-
.await?;
313-
314-
futures::future::try_join_all(
315-
data_planes
316-
.iter_mut()
289+
.await?
290+
.into_iter()
317291
.filter(|dp| {
318292
!decrypted_hmac_keys
319293
.read()
@@ -327,6 +301,11 @@ async fn refresh_decrypted_hmac_keys(
327301
.unwrap()
328302
.is_empty()
329303
})
304+
.collect();
305+
306+
futures::future::try_join_all(
307+
data_planes
308+
.iter_mut()
330309
.map(|dp| agent::decrypt_hmac_keys(dp)),
331310
)
332311
.await?;

crates/agent/src/proxy_connectors.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,19 +215,22 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
215215
impl Future<Output = anyhow::Result<()>> + 'a,
216216
),
217217
)> {
218+
let mut dp = data_plane.clone();
219+
crate::decrypt_hmac_keys(&mut dp).await?;
220+
218221
let tables::DataPlane {
219222
reactor_address,
220223
hmac_keys,
221224
data_plane_fqdn,
222225
..
223-
} = data_plane;
226+
} = dp;
224227

225228
let mut metadata = gazette::Metadata::default();
226229

227230
metadata
228231
.signed_claims(
229232
proto_flow::capability::PROXY_CONNECTOR,
230-
data_plane_fqdn,
233+
&data_plane_fqdn,
231234
*CONNECTOR_TIMEOUT * 2,
232235
&hmac_keys,
233236
Default::default(),
@@ -241,7 +244,7 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
241244

242245
let mut proxy_client =
243246
proto_grpc::runtime::connector_proxy_client::ConnectorProxyClient::with_interceptor(
244-
gazette::dial_channel(reactor_address)?,
247+
gazette::dial_channel(&reactor_address)?,
245248
metadata.clone(),
246249
);
247250
let mut proxy_responses = proxy_client

0 commit comments

Comments
 (0)