Skip to content

adapter: Add coordinator consistency check #21740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::Arc;
use std::time::Duration;

use fail::fail_point;
use itertools::Itertools;
use mz_audit_log::VersionedEvent;
use mz_compute_client::protocol::response::PeekResponse;
use mz_controller::clusters::ReplicaLocation;
Expand Down Expand Up @@ -442,6 +443,16 @@ impl Coordinator {
if !clusters_to_drop.is_empty() {
for cluster_id in clusters_to_drop {
self.controller.drop_cluster(cluster_id);
for id in self
.catalog()
.get_cluster(cluster_id)
.log_indexes
.values()
.cloned()
.collect_vec()
{
self.drop_compute_read_policy(&id);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two notes:

  1. I would move this to before we drop the cluster. It's possible that drop_cluster wipes out some metadata needed to update the read policies of the log indexes.
  2. You'll probably need to collect the log indexes before executing the catalog transaction. Since the transaction completed and this cluster was dropped, the cluster won't exist in the catalog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}
}

Expand Down
25 changes: 0 additions & 25 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ struct DropOps {
ops: Vec<catalog::Op>,
dropped_active_db: bool,
dropped_active_cluster: bool,
/// List of ids which we have to drop from the compute read policy
to_drop_from_compute_read_policy: Vec<GlobalId>,
}

// A bundle of values returned from create_source_inner
Expand Down Expand Up @@ -1234,7 +1232,6 @@ impl Coordinator {
ops,
dropped_active_db,
dropped_active_cluster,
to_drop_from_compute_read_policy,
} = self.sequence_drop_common(session, drop_ids)?;

self.catalog_transact(Some(session), ops).await?;
Expand All @@ -1251,9 +1248,6 @@ impl Coordinator {
name: session.vars().cluster().to_string(),
});
}
for id in to_drop_from_compute_read_policy {
self.drop_compute_read_policy(&id);
}
Ok(ExecuteResponse::DroppedObject(object_type))
}

Expand Down Expand Up @@ -1471,7 +1465,6 @@ impl Coordinator {
ops: drop_ops,
dropped_active_db,
dropped_active_cluster,
to_drop_from_compute_read_policy,
} = self.sequence_drop_common(session, plan.drop_ids)?;

let ops = privilege_revoke_ops
Expand All @@ -1491,9 +1484,6 @@ impl Coordinator {
name: session.vars().cluster().to_string(),
});
}
for id in to_drop_from_compute_read_policy {
self.drop_compute_read_policy(&id);
}
Ok(ExecuteResponse::DroppedOwned)
}

Expand All @@ -1514,7 +1504,6 @@ impl Coordinator {
// Dropping a database or a schema will revoke all default roles associated with that
// database or schema.
let mut default_privilege_revokes = BTreeSet::new();
let mut to_drop_from_compute_read_policy = Vec::new();
for id in &ids {
match id {
ObjectId::Database(id) => {
Expand All @@ -1530,14 +1519,6 @@ impl Coordinator {
}
}
ObjectId::Cluster(id) => {
to_drop_from_compute_read_policy.extend(
self.catalog()
.get_cluster(*id)
.log_indexes
.values()
.cloned(),
);

if let Some(active_id) = self
.catalog()
.active_cluster(session)
Expand Down Expand Up @@ -1620,7 +1601,6 @@ impl Coordinator {
ops,
dropped_active_db,
dropped_active_cluster,
to_drop_from_compute_read_policy,
})
}

Expand Down Expand Up @@ -4219,7 +4199,6 @@ impl Coordinator {
mut ops,
dropped_active_db,
dropped_active_cluster,
to_drop_from_compute_read_policy,
} = self.sequence_drop_common(session, drops)?;

assert!(
Expand All @@ -4244,10 +4223,6 @@ impl Coordinator {
.alter_collection(id, ingestion)
.await
.expect("altering collection after txn must succeed");

for id in to_drop_from_compute_read_policy {
self.drop_compute_read_policy(&id);
}
}
plan::AlterSourceAction::AddSubsourceExports {
subsources,
Expand Down