Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit a8e6828

Browse files
Adding Dispute Participation Metrics (#6838)
* Added participation and queue sizes metrics * First draft of all metric code * Tests pass * Changed Metrics to field on participation + queues * fmt * Improving naming * Refactor, placing timer in ParticipationRequest * fmt * Final cleanup * Revert "Final cleanup" This reverts commit 02e5608. * Changing metric names * Implementing Eq only for unit tests * fmt
1 parent e9bf067 commit a8e6828

File tree

7 files changed

+171
-31
lines changed

7 files changed

+171
-31
lines changed

node/core/dispute-coordinator/src/initialized.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl Initialized {
9696
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;
9797

9898
let (participation_sender, participation_receiver) = mpsc::channel(1);
99-
let participation = Participation::new(participation_sender);
99+
let participation = Participation::new(participation_sender, metrics.clone());
100100
let highest_session = rolling_session_window.latest_session();
101101

102102
Self {
@@ -916,12 +916,17 @@ impl Initialized {
916916
} else {
917917
self.metrics.on_queued_best_effort_participation();
918918
}
919+
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
919920
let r = self
920921
.participation
921922
.queue_participation(
922923
ctx,
923924
priority,
924-
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
925+
ParticipationRequest::new(
926+
new_state.candidate_receipt().clone(),
927+
session,
928+
request_timer,
929+
),
925930
)
926931
.await;
927932
log_error(r)?;

node/core/dispute-coordinator/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem {
347347
?candidate_hash,
348348
"Found valid dispute, with no vote from us on startup - participating."
349349
);
350+
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
350351
participation_requests.push((
351352
ParticipationPriority::with_priority_if(is_included),
352353
ParticipationRequest::new(
353354
vote_state.votes().candidate_receipt.clone(),
354355
session,
356+
request_timer,
355357
),
356358
));
357359
}

node/core/dispute-coordinator/src/metrics.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ struct MetricsInner {
3232
vote_cleanup_time: prometheus::Histogram,
3333
/// Number of refrained participations.
3434
refrained_participations: prometheus::Counter<prometheus::U64>,
35+
/// Distribution of participation durations.
36+
participation_durations: prometheus::Histogram,
37+
/// Measures the duration of the full participation pipeline: From when
38+
/// a participation request is first queued to when participation in the
39+
/// requested dispute is complete.
40+
participation_pipeline_durations: prometheus::Histogram,
41+
/// Size of participation priority queue
42+
participation_priority_queue_size: prometheus::Gauge<prometheus::U64>,
43+
/// Size of participation best effort queue
44+
participation_best_effort_queue_size: prometheus::Gauge<prometheus::U64>,
3545
}
3646

3747
/// Candidate validation metrics.
@@ -96,6 +106,36 @@ impl Metrics {
96106
metrics.refrained_participations.inc();
97107
}
98108
}
109+
110+
/// Provide a timer for participation durations which updates on drop.
111+
pub(crate) fn time_participation(
112+
&self,
113+
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
114+
self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer())
115+
}
116+
117+
/// Provide a timer for participation pipeline durations which updates on drop.
118+
pub(crate) fn time_participation_pipeline(
119+
&self,
120+
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
121+
self.0
122+
.as_ref()
123+
.map(|metrics| metrics.participation_pipeline_durations.start_timer())
124+
}
125+
126+
/// Set the priority_queue_size metric
127+
pub fn report_priority_queue_size(&self, size: u64) {
128+
if let Some(metrics) = &self.0 {
129+
metrics.participation_priority_queue_size.set(size);
130+
}
131+
}
132+
133+
/// Set the best_effort_queue_size metric
134+
pub fn report_best_effort_queue_size(&self, size: u64) {
135+
if let Some(metrics) = &self.0 {
136+
metrics.participation_best_effort_queue_size.set(size);
137+
}
138+
}
99139
}
100140

101141
impl metrics::Metrics for Metrics {
@@ -163,6 +203,34 @@ impl metrics::Metrics for Metrics {
163203
))?,
164204
registry,
165205
)?,
206+
participation_durations: prometheus::register(
207+
prometheus::Histogram::with_opts(
208+
prometheus::HistogramOpts::new(
209+
"polkadot_parachain_dispute_participation_durations",
210+
"Time spent within fn Participation::participate",
211+
)
212+
)?,
213+
registry,
214+
)?,
215+
participation_pipeline_durations: prometheus::register(
216+
prometheus::Histogram::with_opts(
217+
prometheus::HistogramOpts::new(
218+
"polkadot_parachain_dispute_participation_pipeline_durations",
219+
"Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.",
220+
)
221+
)?,
222+
registry,
223+
)?,
224+
participation_priority_queue_size: prometheus::register(
225+
prometheus::Gauge::new("polkadot_parachain_dispute_participation_priority_queue_size",
226+
"Number of disputes waiting for local participation in the priority queue.")?,
227+
registry,
228+
)?,
229+
participation_best_effort_queue_size: prometheus::register(
230+
prometheus::Gauge::new("polkadot_parachain_dispute_participation_best_effort_queue_size",
231+
"Number of disputes waiting for local participation in the best effort queue.")?,
232+
registry,
233+
)?,
166234
};
167235
Ok(Metrics(Some(metrics)))
168236
}

node/core/dispute-coordinator/src/participation/mod.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ mod queues;
4848
use queues::Queues;
4949
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};
5050

51+
use crate::metrics::Metrics;
52+
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;
53+
5154
/// How many participation processes do we want to run in parallel the most.
5255
///
5356
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
@@ -71,6 +74,8 @@ pub struct Participation {
7174
worker_sender: WorkerMessageSender,
7275
/// Some recent block for retrieving validation code from chain.
7376
recent_block: Option<(BlockNumber, Hash)>,
77+
/// Metrics handle cloned from Initialized
78+
metrics: Metrics,
7479
}
7580

7681
/// Message from worker tasks.
@@ -135,12 +140,13 @@ impl Participation {
135140
/// The passed in sender will be used by background workers to communicate back their results.
136141
/// The calling context should make sure to call `Participation::on_worker_message()` for the
137142
/// received messages.
138-
pub fn new(sender: WorkerMessageSender) -> Self {
143+
pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self {
139144
Self {
140145
running_participations: HashSet::new(),
141-
queue: Queues::new(),
146+
queue: Queues::new(metrics.clone()),
142147
worker_sender: sender,
143148
recent_block: None,
149+
metrics,
144150
}
145151
}
146152

@@ -253,11 +259,19 @@ impl Participation {
253259
req: ParticipationRequest,
254260
recent_head: Hash,
255261
) -> FatalResult<()> {
262+
let participation_timer = self.metrics.time_participation();
256263
if self.running_participations.insert(*req.candidate_hash()) {
257264
let sender = ctx.sender().clone();
258265
ctx.spawn(
259266
"participation-worker",
260-
participate(self.worker_sender.clone(), sender, recent_head, req).boxed(),
267+
participate(
268+
self.worker_sender.clone(),
269+
sender,
270+
recent_head,
271+
req,
272+
participation_timer,
273+
)
274+
.boxed(),
261275
)
262276
.map_err(FatalError::SpawnFailed)?;
263277
}
@@ -269,7 +283,8 @@ async fn participate(
269283
mut result_sender: WorkerMessageSender,
270284
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
271285
block_hash: Hash,
272-
req: ParticipationRequest,
286+
req: ParticipationRequest, // Sends metric data via request_timer field when dropped
287+
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
273288
) {
274289
#[cfg(test)]
275290
// Hack for tests, so we get recovery messages not too early.

node/core/dispute-coordinator/src/participation/queues/mod.rs

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
// You should have received a copy of the GNU General Public License
1515
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
1616

17-
use std::{cmp::Ordering, collections::BTreeMap};
17+
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
1818

1919
use futures::channel::oneshot;
2020
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
@@ -25,6 +25,9 @@ use crate::{
2525
LOG_TARGET,
2626
};
2727

28+
use crate::metrics::Metrics;
29+
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;
30+
2831
#[cfg(test)]
2932
mod tests;
3033

@@ -56,14 +59,18 @@ pub struct Queues {
5659

5760
/// Priority queue.
5861
priority: BTreeMap<CandidateComparator, ParticipationRequest>,
62+
63+
/// Handle for recording queues data in metrics
64+
metrics: Metrics,
5965
}
6066

6167
/// A dispute participation request that can be queued.
62-
#[derive(Debug, PartialEq, Eq, Clone)]
68+
#[derive(Debug, Clone)]
6369
pub struct ParticipationRequest {
6470
candidate_hash: CandidateHash,
6571
candidate_receipt: CandidateReceipt,
6672
session: SessionIndex,
73+
_request_timer: Arc<Option<prometheus::HistogramTimer>>, // Sends metric data when request is dropped
6774
}
6875

6976
/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
@@ -107,8 +114,17 @@ pub enum QueueError {
107114

108115
impl ParticipationRequest {
109116
/// Create a new `ParticipationRequest` to be queued.
110-
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
111-
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
117+
pub fn new(
118+
candidate_receipt: CandidateReceipt,
119+
session: SessionIndex,
120+
request_timer: Arc<Option<prometheus::HistogramTimer>>,
121+
) -> Self {
122+
Self {
123+
candidate_hash: candidate_receipt.hash(),
124+
candidate_receipt,
125+
session,
126+
_request_timer: request_timer,
127+
}
112128
}
113129

114130
pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
@@ -126,10 +142,29 @@ impl ParticipationRequest {
126142
}
127143
}
128144

145+
// We want to compare participation requests in unit tests, so we
146+
// only implement Eq for tests.
147+
#[cfg(test)]
148+
impl PartialEq for ParticipationRequest {
149+
fn eq(&self, other: &Self) -> bool {
150+
let ParticipationRequest {
151+
candidate_receipt,
152+
candidate_hash,
153+
session: _session,
154+
_request_timer,
155+
} = self;
156+
candidate_receipt == other.candidate_receipt() &&
157+
candidate_hash == other.candidate_hash() &&
158+
self.session == other.session()
159+
}
160+
}
161+
#[cfg(test)]
162+
impl Eq for ParticipationRequest {}
163+
129164
impl Queues {
130165
/// Create new `Queues`.
131-
pub fn new() -> Self {
132-
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
166+
pub fn new(metrics: Metrics) -> Self {
167+
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), metrics }
133168
}
134169

135170
/// Will put message in queue, either priority or best effort depending on priority.
@@ -154,9 +189,14 @@ impl Queues {
154189
/// First the priority queue is considered and then the best effort one.
155190
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
156191
if let Some(req) = self.pop_priority() {
192+
self.metrics.report_priority_queue_size(self.priority.len() as u64);
193+
return Some(req.1)
194+
}
195+
if let Some(req) = self.pop_best_effort() {
196+
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
157197
return Some(req.1)
158198
}
159-
self.pop_best_effort().map(|d| d.1)
199+
None
160200
}
161201

162202
/// Reprioritizes any participation requests pertaining to the
@@ -180,6 +220,9 @@ impl Queues {
180220
}
181221
if let Some(request) = self.best_effort.remove(&comparator) {
182222
self.priority.insert(comparator, request);
223+
// Report changes to both queue sizes
224+
self.metrics.report_priority_queue_size(self.priority.len() as u64);
225+
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
183226
}
184227
Ok(())
185228
}
@@ -197,6 +240,8 @@ impl Queues {
197240
// Remove any best effort entry:
198241
self.best_effort.remove(&comparator);
199242
self.priority.insert(comparator, req);
243+
self.metrics.report_priority_queue_size(self.priority.len() as u64);
244+
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
200245
} else {
201246
if self.priority.contains_key(&comparator) {
202247
// The candidate is already in priority queue - don't
@@ -207,6 +252,7 @@ impl Queues {
207252
return Err(QueueError::BestEffortFull)
208253
}
209254
self.best_effort.insert(comparator, req);
255+
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
210256
}
211257
Ok(())
212258
}

0 commit comments

Comments
 (0)