Skip to content

Commit 8568f9d

Browse files
adapter: Add coordinator consistency check (#21740)
Piggybacking on #21717, adding consistency checks to the coordinator that get run in dev and during testdrive
1 parent 3901a23 commit 8568f9d

File tree

12 files changed

+186
-19
lines changed

12 files changed

+186
-19
lines changed

misc/python/materialize/cloudtest/k8s/environmentd.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ def __init__(self, namespace: str = DEFAULT_K8S_NAMESPACE) -> None:
5050
service_port = V1ServicePort(name="sql", port=6875)
5151
http_port = V1ServicePort(name="http", port=6876)
5252
internal_port = V1ServicePort(name="internal", port=6877)
53+
internal_http_port = V1ServicePort(name="internalhttp", port=6878)
5354
self.service = V1Service(
5455
api_version="v1",
5556
kind="Service",
5657
metadata=V1ObjectMeta(name="environmentd", labels={"app": "environmentd"}),
5758
spec=V1ServiceSpec(
5859
type="NodePort",
59-
ports=[service_port, internal_port, http_port],
60+
ports=[service_port, internal_port, http_port, internal_http_port],
6061
selector={"app": "environmentd"},
6162
),
6263
)
@@ -184,6 +185,7 @@ def args(self) -> list[str]:
184185
f"--adapter-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter",
185186
f"--storage-stash-url=postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=storage",
186187
"--internal-sql-listen-addr=0.0.0.0:6877",
188+
"--internal-http-listen-addr=0.0.0.0:6878",
187189
"--unsafe-mode",
188190
# cloudtest may be called upon to spin up older versions of
189191
# Materialize too! If you are adding a command-line option that is

src/adapter/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ use crate::{AdapterError, AdapterNotice, ExecuteResponse};
117117

118118
mod builtin_table_updates;
119119
mod config;
120-
mod consistency;
120+
pub(crate) mod consistency;
121121
mod error;
122122
mod migrate;
123123

src/adapter/src/catalog/consistency.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use serde::Serialize;
2323

2424
use super::CatalogState;
2525

26-
#[derive(Debug, Default, Serialize)]
26+
#[derive(Debug, Default, Clone, Serialize, PartialEq)]
2727
pub struct CatalogInconsistencies {
2828
/// Inconsistencies found with internal fields, if any.
2929
internal_fields: Vec<InternalFieldsInconsistency>,
@@ -347,7 +347,7 @@ impl CatalogState {
347347
}
348348
}
349349

350-
#[derive(Debug, Serialize)]
350+
#[derive(Debug, Serialize, Clone, PartialEq)]
351351
enum InternalFieldsInconsistency {
352352
Database(String, DatabaseId),
353353
AmbientSchema(String, SchemaId),
@@ -356,7 +356,7 @@ enum InternalFieldsInconsistency {
356356
Role(String, RoleId),
357357
}
358358

359-
#[derive(Debug, Serialize)]
359+
#[derive(Debug, Serialize, Clone, PartialEq)]
360360
enum RoleInconsistency {
361361
Database(DatabaseId, RoleId),
362362
Schema(SchemaId, RoleId),
@@ -374,15 +374,15 @@ enum RoleInconsistency {
374374
},
375375
}
376376

377-
#[derive(Debug, Serialize)]
377+
#[derive(Debug, Serialize, Clone, PartialEq)]
378378
enum CommentInconsistency {
379379
/// A comment was found for an object that no longer exists.
380380
Dangling(CommentObjectId),
381381
/// A comment with a column position was found on a non-relation.
382382
NonRelation(CommentObjectId, usize),
383383
}
384384

385-
#[derive(Debug, Serialize)]
385+
#[derive(Debug, Serialize, Clone, PartialEq)]
386386
enum ObjectDependencyInconsistency {
387387
/// Object A uses Object B, but Object B does not exist.
388388
MissingUses {

src/adapter/src/client.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,21 @@ impl SessionClient {
590590
catalog.check_consistency()
591591
}
592592

593+
/// Checks the coordinator for internal consistency, returning a JSON object describing the
594+
/// inconsistencies, if there are any. This is a superset of checks that check_catalog performs,
595+
///
596+
/// No authorization is performed, so access to this function must be limited to internal
597+
/// servers or superusers.
598+
pub async fn check_coordinator(&mut self) -> Result<(), serde_json::Value> {
599+
self.send_without_session(|tx| Command::CheckConsistency { tx })
600+
.await
601+
.map_err(|inconsistencies| {
602+
serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
603+
serde_json::Value::String("failed to serialize inconsistencies".to_string())
604+
})
605+
})
606+
}
607+
593608
/// Tells the coordinator a statement has finished execution, in the cases
594609
/// where we have no other reason to communicate with the coordinator.
595610
pub fn retire_execute(
@@ -727,7 +742,8 @@ impl SessionClient {
727742
| Command::GetSystemVars { .. }
728743
| Command::SetSystemVars { .. }
729744
| Command::Terminate { .. }
730-
| Command::RetireExecute { .. } => {}
745+
| Command::RetireExecute { .. }
746+
| Command::CheckConsistency { .. } => {}
731747
};
732748
cmd
733749
});

src/adapter/src/command.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use uuid::Uuid;
4343

4444
use crate::catalog::Catalog;
4545
use crate::client::{ConnectionId, ConnectionIdType};
46+
use crate::coord::consistency::CoordinatorInconsistencies;
4647
use crate::coord::peek::PeekResponseUnary;
4748
use crate::coord::ExecuteContextExtra;
4849
use crate::error::AdapterError;
@@ -137,6 +138,10 @@ pub enum Command {
137138
data: ExecuteContextExtra,
138139
reason: StatementEndedExecutionReason,
139140
},
141+
142+
CheckConsistency {
143+
tx: oneshot::Sender<Result<(), CoordinatorInconsistencies>>,
144+
},
140145
}
141146

142147
impl Command {
@@ -151,7 +156,8 @@ impl Command {
151156
| Command::Terminate { .. }
152157
| Command::GetSystemVars { .. }
153158
| Command::SetSystemVars { .. }
154-
| Command::RetireExecute { .. } => None,
159+
| Command::RetireExecute { .. }
160+
| Command::CheckConsistency { .. } => None,
155161
}
156162
}
157163

@@ -166,7 +172,8 @@ impl Command {
166172
| Command::Terminate { .. }
167173
| Command::GetSystemVars { .. }
168174
| Command::SetSystemVars { .. }
169-
| Command::RetireExecute { .. } => None,
175+
| Command::RetireExecute { .. }
176+
| Command::CheckConsistency { .. } => None,
170177
}
171178
}
172179
}

src/adapter/src/coord.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ pub(crate) mod timestamp_selection;
159159

160160
mod appends;
161161
mod command_handler;
162+
pub mod consistency;
162163
mod ddl;
163164
mod indexes;
164165
mod introspection;

src/adapter/src/coord/command_handler.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ impl Coordinator {
210210
catalog: self.owned_catalog(),
211211
});
212212
}
213+
214+
Command::CheckConsistency { tx } => {
215+
let _ = tx.send(self.check_consistency());
216+
}
213217
}
214218
}
215219

src/adapter/src/coord/consistency.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
//! Internal consistency checks that validate invariants of [`Coordinator`].
11+
12+
use super::Coordinator;
13+
use crate::catalog::consistency::CatalogInconsistencies;
14+
use mz_repr::GlobalId;
15+
use serde::Serialize;
16+
17+
#[derive(Debug, Default, Serialize, PartialEq)]
18+
pub struct CoordinatorInconsistencies {
19+
/// Inconsistencies found in the catalog.
20+
catalog_inconsistencies: CatalogInconsistencies,
21+
/// Inconsistencies found in read capabilities.
22+
read_capabilities: Vec<ReadCapabilitiesInconsistency>,
23+
}
24+
25+
impl CoordinatorInconsistencies {
26+
pub fn is_empty(&self) -> bool {
27+
self.catalog_inconsistencies.is_empty() && self.read_capabilities.is_empty()
28+
}
29+
}
30+
31+
impl Coordinator {
32+
/// Checks the [`Coordinator`] to make sure we're internally consistent.
33+
pub fn check_consistency(&self) -> Result<(), CoordinatorInconsistencies> {
34+
let mut inconsistencies = CoordinatorInconsistencies::default();
35+
36+
if let Err(catalog_inconsistencies) = self.catalog().state().check_consistency() {
37+
inconsistencies.catalog_inconsistencies = catalog_inconsistencies;
38+
}
39+
40+
if let Err(read_capabilities) = self.check_read_capabilities() {
41+
inconsistencies.read_capabilities = read_capabilities;
42+
}
43+
44+
if inconsistencies.is_empty() {
45+
Ok(())
46+
} else {
47+
Err(inconsistencies)
48+
}
49+
}
50+
51+
/// # Invariants:
52+
///
53+
/// * Read capabilities should reference known objects.
54+
///
55+
fn check_read_capabilities(&self) -> Result<(), Vec<ReadCapabilitiesInconsistency>> {
56+
let mut read_capabilities_inconsistencies = Vec::new();
57+
for (gid, _) in &self.storage_read_capabilities {
58+
if self.catalog().try_get_entry(gid).is_none() {
59+
read_capabilities_inconsistencies
60+
.push(ReadCapabilitiesInconsistency::Storage(gid.clone()));
61+
}
62+
}
63+
for (gid, _) in &self.compute_read_capabilities {
64+
if !gid.is_transient() && self.catalog().try_get_entry(gid).is_none() {
65+
read_capabilities_inconsistencies
66+
.push(ReadCapabilitiesInconsistency::Compute(gid.clone()));
67+
}
68+
}
69+
70+
if read_capabilities_inconsistencies.is_empty() {
71+
Ok(())
72+
} else {
73+
Err(read_capabilities_inconsistencies)
74+
}
75+
}
76+
}
77+
78+
#[derive(Debug, Serialize, PartialEq, Eq)]
79+
enum ReadCapabilitiesInconsistency {
80+
Storage(GlobalId),
81+
Compute(GlobalId),
82+
}

src/adapter/src/coord/ddl.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ impl Coordinator {
124124
let mut update_metrics_retention = false;
125125
let mut update_secrets_caching_config = false;
126126
let mut update_cluster_scheduling_config = false;
127+
let mut log_indexes_to_drop = Vec::new();
127128

128129
for op in &ops {
129130
match op {
@@ -198,6 +199,13 @@ impl Coordinator {
198199
}
199200
catalog::Op::DropObject(ObjectId::Cluster(id)) => {
200201
clusters_to_drop.push(*id);
202+
log_indexes_to_drop.extend(
203+
self.catalog()
204+
.get_cluster(*id)
205+
.log_indexes
206+
.values()
207+
.cloned(),
208+
);
201209
}
202210
catalog::Op::DropObject(ObjectId::ClusterReplica((cluster_id, replica_id))) => {
203211
// Drop the cluster replica itself.
@@ -440,6 +448,11 @@ impl Coordinator {
440448
self.drop_replica(cluster_id, replica_id).await;
441449
}
442450
}
451+
if !log_indexes_to_drop.is_empty() {
452+
for id in log_indexes_to_drop {
453+
self.drop_compute_read_policy(&id);
454+
}
455+
}
443456
if !clusters_to_drop.is_empty() {
444457
for cluster_id in clusters_to_drop {
445458
self.controller.drop_cluster(cluster_id);
@@ -517,7 +530,7 @@ impl Coordinator {
517530

518531
// Note: It's important that we keep the function call inside macro, this way we only run
519532
// the consistency checks if sort assertions are enabled.
520-
mz_ore::soft_assert_eq!(self.catalog().check_consistency(), Ok(()));
533+
mz_ore::soft_assert_eq!(self.check_consistency(), Ok(()));
521534

522535
Ok(result)
523536
}

src/environmentd/src/http.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,10 @@ impl InternalHttpServer {
432432
"/api/catalog/check",
433433
routing::get(catalog::handle_catalog_check),
434434
)
435+
.route(
436+
"/api/coordinator/check",
437+
routing::get(catalog::handle_coordinator_check),
438+
)
435439
.route(
436440
"/api/internal-console",
437441
routing::get(|| async move {

src/environmentd/src/http/catalog.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,11 @@ pub async fn handle_catalog_check(mut client: AuthedClient) -> impl IntoResponse
3030
};
3131
(TypedHeader(ContentType::json()), response.to_string())
3232
}
33+
34+
pub async fn handle_coordinator_check(mut client: AuthedClient) -> impl IntoResponse {
35+
let response = match client.client.check_coordinator().await {
36+
Ok(_) => serde_json::Value::String("".to_string()),
37+
Err(inconsistencies) => serde_json::json!({ "err": inconsistencies }),
38+
};
39+
(TypedHeader(ContentType::json()), response.to_string())
40+
}

src/testdrive/src/action/sql.rs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ use std::ascii;
1111
use std::error::Error;
1212
use std::fmt::{self, Display, Formatter, Write as _};
1313
use std::io::{self, Write};
14-
use std::time::SystemTime;
14+
use std::time::{Duration, SystemTime};
1515

1616
use anyhow::{anyhow, bail, Context};
17+
use http::StatusCode;
1718
use md5::{Digest, Md5};
1819
use mz_ore::collections::CollectionExt;
1920
use mz_ore::retry::Retry;
@@ -136,18 +137,47 @@ pub async fn run_sql(mut cmd: SqlCommand, state: &mut State) -> Result<ControlFl
136137
| Statement::GrantRole { .. }
137138
| Statement::RevokePrivileges { .. }
138139
| Statement::RevokeRole { .. } => {
140+
let response = Retry::default()
141+
.max_duration(Duration::from_secs(3))
142+
.clamp_backoff(Duration::from_millis(500))
143+
.retry_async(|_| async {
144+
reqwest::get(&format!(
145+
"http://{}/api/coordinator/check",
146+
state.materialize_internal_http_addr,
147+
))
148+
.await
149+
.context("while getting response from coordinator check")
150+
})
151+
.await?;
152+
if response.status() == StatusCode::NOT_FOUND {
153+
tracing::info!(
154+
"not performing coordinator check because the endpoint doesn't exist"
155+
);
156+
} else {
157+
// 404 can happen if we're testing an older version of environmentd
158+
let inconsistencies = response
159+
.error_for_status()
160+
.context("response from coordinator check returned an error")?
161+
.text()
162+
.await
163+
.context("while getting text from coordinator check")?;
164+
let inconsistencies: serde_json::Value = serde_json::from_str(&inconsistencies)
165+
.with_context(|| {
166+
format!(
167+
"while parsing result from consistency check: {:?}",
168+
inconsistencies
169+
)
170+
})?;
171+
if inconsistencies != serde_json::json!("") {
172+
bail!("Internal catalog inconsistencies {inconsistencies:#?}");
173+
}
174+
}
175+
139176
let catalog_state = state
140177
.with_catalog_copy(|catalog| catalog.state().clone())
141178
.await
142179
.map_err(|e| anyhow!("failed to read on-disk catalog state: {e}"))?;
143180

144-
// Run internal consistency checks.
145-
if let Some(state) = &catalog_state {
146-
if let Err(inconsistencies) = state.check_consistency() {
147-
bail!("Internal catalog inconsistencies {inconsistencies:#?}");
148-
}
149-
}
150-
151181
// Check that our on-disk state matches the in-memory state.
152182
let disk_state =
153183
catalog_state.map(|state| state.dump().expect("state must be dumpable"));

0 commit comments

Comments
 (0)