Skip to content

persist: count compaction fast-path eligible reqs #18380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use mz_ore::task::spawn;
use mz_persist::location::Blob;
use mz_persist_types::codec_impls::VecU8Schema;
use mz_persist_types::{Codec, Codec64};
use timely::progress::Timestamp;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot, TryAcquireError};
Expand Down Expand Up @@ -410,6 +410,31 @@ where
schemas: Schemas<K, V>,
) -> Result<CompactRes<T>, anyhow::Error> {
let () = Self::validate_req(&req)?;

// We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363
// but had to revert it due to a very scary bug. Here we count how many of our compaction reqs
// could be eligible for the optimization to better understand whether it's worth trying to
// reintroduce it.
let mut single_nonempty_batch = None;
for batch in &req.inputs {
if batch.len > 0 {
match single_nonempty_batch {
None => single_nonempty_batch = Some(batch),
Some(_previous_nonempty_batch) => {
single_nonempty_batch = None;
break;
}
}
}
}
if let Some(single_nonempty_batch) = single_nonempty_batch {
if single_nonempty_batch.runs.len() == 0
&& single_nonempty_batch.desc.since() != &Antichain::from_elem(T::minimum())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically we can only stop compacting it when the since is past the upper (otherwise we might get some consolidation from forwarding the timestamps), but this is probably close enough to give us a signal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we'd see that in practice, because the upper of the output will also get bumped each time through, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessarily. like the upper of the shard could be at 7 and we could end up compacting [0,2) and [2,4)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC the most common case of this was a mostly-empty shard that looks like [0, 1) (with data), followed by empty [1, 2), [2, 3), ... progress batches that didn't get the empty-batch shortcut. so each time compaction fires, it gets an input of all the batches, and so in practice the upper would advance each time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good point. I don't think it would be correct to use the logic you have here to trigger the optimization, but it's certainly safe to use as an upper bound on the potential benefit of the technique

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fair -- I mostly want a ballpark on whether fast-path compactions are like, 1%, 10%, 50% of our writes. I'm related-ly curious about TimelyDataflow/differential-dataflow#277 which seems like it'd help address your point here without having to compact a batch that will never benefit from logical compaction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha, I was about to point you at that PR! last time I took Frank's temperature on it, he was pretty hesitant to make any scary changes to the DD Spine, but I think it's a pretty straightforward cherry-pick to apply it to our fork

{
metrics.compaction.fast_path_eligible.inc();
}
}

// compaction needs memory enough for at least 2 runs and 2 in-progress parts
assert!(cfg.compaction_memory_bound_bytes >= 4 * cfg.batch.blob_target_size);
// reserve space for the in-progress part to be held in-mem representation and columnar
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ pub struct CompactionMetrics {
pub(crate) not_all_prefetched: IntCounter,
pub(crate) parts_prefetched: IntCounter,
pub(crate) parts_waited: IntCounter,
pub(crate) fast_path_eligible: IntCounter,

pub(crate) applied_exact_match: IntCounter,
pub(crate) applied_subset_match: IntCounter,
Expand Down Expand Up @@ -748,6 +749,10 @@ impl CompactionMetrics {
name: "mz_persist_compaction_parts_waited",
help: "count of compaction parts that had to be waited on",
)),
fast_path_eligible: registry.register(metric!(
name: "mz_persist_compaction_fast_path_eligible",
help: "count of compaction requests that could have used the fast-path optimization",
)),
applied_exact_match: registry.register(metric!(
name: "mz_persist_compaction_applied_exact_match",
help: "count of merge results that exactly replaced a SpineBatch",
Expand Down