Skip to content

Commit 53b77fd

Browse files
authored
Insert samples using the native client (#7013)
- Remove unneeded and error-prone `n_columns` and `n_rows` fields from the `Block` type. These are now methods that delegate to the actual column arrays, which are the source of truth anyway. - Add methods for extracting fields from a sample as a set of data blocks, one per field table. - Add methods to extract measurements from a sample as a block destined for one table. Take care to correctly construct missing samples, especially for histograms. Also implement the `FromBlock` trait for samples, to ensure we can extract the raw data as well. This is only used in tests, in this commit. - Insert fields and measurements from a sample as a data block, using the native interface. - Fix serde of UUIDs in native format, which doesn't match the documentation at least for our current version of ClickHouse. - Remove code serializing fields and measurements into JSON. - Closes #6884
1 parent 11667c1 commit 53b77fd

File tree

21 files changed

+2433
-1124
lines changed

21 files changed

+2433
-1124
lines changed

oximeter/collector/src/bin/clickhouse-schema-updater.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use clap::Parser;
1313
use clap::Subcommand;
1414
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
1515
use omicron_common::address::CLICKHOUSE_TCP_PORT;
16-
use oximeter_db::model::OXIMETER_VERSION;
1716
use oximeter_db::Client;
17+
use oximeter_db::OXIMETER_VERSION;
1818
use slog::Drain;
1919
use slog::Level;
2020
use slog::LevelFilter;

oximeter/db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ slog.workspace = true
4242
slog-async.workspace = true
4343
slog-dtrace.workspace = true
4444
slog-term.workspace = true
45+
strum.workspace = true
4546
thiserror.workspace = true
4647
tokio-util.workspace = true
4748
usdt.workspace = true

oximeter/db/src/client/dbwrite.rs

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,22 @@
99
use crate::client::Client;
1010
use crate::model;
1111
use crate::model::to_block::ToBlock as _;
12+
use crate::native::block::Block;
1213
use crate::Error;
1314
use camino::Utf8PathBuf;
1415
use oximeter::Sample;
1516
use oximeter::TimeseriesSchema;
1617
use slog::debug;
18+
use std::collections::btree_map::Entry;
1719
use std::collections::BTreeMap;
1820
use std::collections::BTreeSet;
1921

2022
#[derive(Debug)]
2123
pub(super) struct UnrolledSampleRows {
2224
/// The timeseries schema rows.
2325
pub new_schema: Vec<TimeseriesSchema>,
24-
/// The rows to insert in all the other tables, keyed by the table name.
25-
pub rows: BTreeMap<String, Vec<String>>,
26+
/// The blocks to insert in all the other tables, keyed by the table name.
27+
pub blocks: BTreeMap<String, Block>,
2628
}
2729

2830
/// A trait allowing a [`Client`] to write data into the timeseries database.
@@ -54,10 +56,10 @@ impl DbWrite for Client {
5456
/// Insert the given samples into the database.
5557
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
5658
debug!(self.log, "unrolling {} total samples", samples.len());
57-
let UnrolledSampleRows { new_schema, rows } =
59+
let UnrolledSampleRows { new_schema, blocks } =
5860
self.unroll_samples(samples).await;
5961
self.save_new_schema_or_remove(new_schema).await?;
60-
self.insert_unrolled_samples(rows).await
62+
self.insert_unrolled_samples(blocks).await
6163
}
6264

6365
/// Initialize the replicated telemetry database, creating tables as needed.
@@ -172,7 +174,7 @@ impl Client {
172174
samples: &[Sample],
173175
) -> UnrolledSampleRows {
174176
let mut seen_timeseries = BTreeSet::new();
175-
let mut rows = BTreeMap::new();
177+
let mut table_blocks = BTreeMap::new();
176178
let mut new_schema = BTreeMap::new();
177179

178180
for sample in samples.iter() {
@@ -200,48 +202,61 @@ impl Client {
200202
crate::timeseries_key(sample),
201203
);
202204
if !seen_timeseries.contains(&key) {
203-
for (table_name, table_rows) in model::unroll_field_rows(sample)
205+
for (table_name, block) in
206+
model::fields::extract_fields_as_block(sample)
204207
{
205-
rows.entry(table_name)
206-
.or_insert_with(Vec::new)
207-
.extend(table_rows);
208+
match table_blocks.entry(table_name) {
209+
Entry::Vacant(entry) => {
210+
entry.insert(block);
211+
}
212+
Entry::Occupied(mut entry) => entry
213+
.get_mut()
214+
.concat(block)
215+
.expect("All blocks for a table must match"),
216+
}
208217
}
209218
}
210219

211-
let (table_name, measurement_row) =
212-
model::unroll_measurement_row(sample);
213-
214-
rows.entry(table_name)
215-
.or_insert_with(Vec::new)
216-
.push(measurement_row);
220+
let (table_name, measurement_block) =
221+
model::measurements::extract_measurement_as_block(sample);
222+
match table_blocks.entry(table_name) {
223+
Entry::Vacant(entry) => {
224+
entry.insert(measurement_block);
225+
}
226+
Entry::Occupied(mut entry) => entry
227+
.get_mut()
228+
.concat(measurement_block)
229+
.expect("All blocks for a table must match"),
230+
}
217231

218232
seen_timeseries.insert(key);
219233
}
220234

221235
let new_schema = new_schema.into_values().collect();
222-
UnrolledSampleRows { new_schema, rows }
236+
UnrolledSampleRows { new_schema, blocks: table_blocks }
223237
}
224238

225239
// Insert unrolled sample rows into the corresponding tables.
226240
async fn insert_unrolled_samples(
227241
&self,
228-
rows: BTreeMap<String, Vec<String>>,
242+
blocks: BTreeMap<String, Block>,
229243
) -> Result<(), Error> {
230-
for (table_name, rows) in rows {
231-
let body = format!(
232-
"INSERT INTO {table_name} FORMAT JSONEachRow\n{row_data}\n",
244+
for (table_name, block) in blocks {
245+
let n_rows = block.n_rows();
246+
let query = format!(
247+
"INSERT INTO {db_name}.{table_name} FORMAT Native",
248+
db_name = crate::DATABASE_NAME,
233249
table_name = table_name,
234-
row_data = rows.join("\n")
235250
);
236251
// TODO-robustness We've verified the schema, so this is likely a transient failure.
237252
// But we may want to check the actual error condition, and, if possible, continue
238253
// inserting any remaining data.
239-
self.execute(body).await?;
254+
self.insert_native(&query, block).await?;
240255
debug!(
241256
self.log,
242257
"inserted rows into table";
243-
"n_rows" => rows.len(),
244-
"table_name" => table_name,
258+
"n_rows" => n_rows,
259+
"table_name" => &table_name,
245260
);
246261
}
247262

oximeter/db/src/client/mod.rs

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ impl Client {
10041004
returned an empty data block",
10051005
))
10061006
})
1007-
.map(|block| block.n_rows > 0)
1007+
.map(|block| block.n_rows() > 0)
10081008
})
10091009
}
10101010

@@ -1123,15 +1123,28 @@ impl Client {
11231123
) -> Result<QueryResult, Error> {
11241124
trace!(
11251125
self.log,
1126-
"executing SQL query";
1126+
"inserting data";
11271127
"sql" => sql,
1128+
"n_rows" => block.n_rows(),
1129+
"n_columns" => block.n_columns(),
11281130
);
11291131
let mut handle = self.native_pool.claim().await?;
11301132
let id = usdt::UniqueId::new();
1131-
probes::sql__query__start!(|| (&id, &sql));
1132-
let result = handle.insert(sql, block).await.map_err(Error::from);
1133+
probes::sql__query__start!(|| (&id, sql));
1134+
let now = tokio::time::Instant::now();
1135+
let result = tokio::time::timeout(
1136+
self.request_timeout,
1137+
handle.insert(sql, block),
1138+
)
1139+
.await;
1140+
let elapsed = now.elapsed();
11331141
probes::sql__query__done!(|| (&id));
1134-
result
1142+
match result {
1143+
Ok(result) => result.map_err(Error::from),
1144+
Err(_) => Err(Error::DatabaseUnavailable(format!(
1145+
"SQL query timed out after {elapsed:?}"
1146+
))),
1147+
}
11351148
}
11361149

11371150
// Execute a generic SQL statement, using the native TCP interface.
@@ -1155,21 +1168,18 @@ impl Client {
11551168

11561169
let mut handle = self.native_pool.claim().await?;
11571170
let id = usdt::UniqueId::new();
1158-
probes::sql__query__start!(|| (&id, &sql));
1159-
let result = handle.query(sql).await.map_err(Error::from);
1171+
probes::sql__query__start!(|| (&id, sql));
1172+
let now = tokio::time::Instant::now();
1173+
let result =
1174+
tokio::time::timeout(self.request_timeout, handle.query(sql)).await;
1175+
let elapsed = now.elapsed();
11601176
probes::sql__query__done!(|| (&id));
1161-
result
1162-
}
1163-
1164-
// Execute a generic SQL statement.
1165-
//
1166-
// TODO-robustness This currently does no validation of the statement.
1167-
async fn execute<S>(&self, sql: S) -> Result<(), Error>
1168-
where
1169-
S: Into<String>,
1170-
{
1171-
self.execute_with_body(sql).await?;
1172-
Ok(())
1177+
match result {
1178+
Ok(result) => result.map_err(Error::from),
1179+
Err(_) => Err(Error::DatabaseUnavailable(format!(
1180+
"SQL query timed out after {elapsed:?}"
1181+
))),
1182+
}
11731183
}
11741184

11751185
// Execute a generic SQL statement, awaiting the response as text
@@ -1281,14 +1291,14 @@ impl Client {
12811291
trace!(self.log, "no new timeseries schema in database");
12821292
return Ok(());
12831293
};
1284-
if data.n_rows == 0 {
1294+
if data.n_rows() == 0 {
12851295
trace!(self.log, "no new timeseries schema in database");
12861296
return Ok(());
12871297
}
12881298
trace!(
12891299
self.log,
12901300
"retrieved new timeseries schema";
1291-
"n_schema" => data.n_rows,
1301+
"n_schema" => data.n_rows(),
12921302
);
12931303
for new_schema in TimeseriesSchema::from_block(data)?.into_iter() {
12941304
schema.insert(new_schema.timeseries_name.clone(), new_schema);
@@ -3517,41 +3527,44 @@ mod tests {
35173527
// Insert a record from this datum.
35183528
const TIMESERIES_NAME: &str = "foo:bar";
35193529
const TIMESERIES_KEY: u64 = 101;
3520-
let (measurement_table, inserted_row) =
3521-
crate::model::unroll_measurement_row_impl(
3530+
let (measurement_table, inserted_block) =
3531+
crate::model::measurements::extract_measurement_as_block_impl(
35223532
TIMESERIES_NAME.to_string(),
35233533
TIMESERIES_KEY,
35243534
&measurement,
35253535
);
35263536
let insert_sql = format!(
3527-
"INSERT INTO {measurement_table} FORMAT JSONEachRow {inserted_row}",
3537+
"INSERT INTO {}.{} FORMAT Native ",
3538+
crate::DATABASE_NAME,
3539+
measurement_table,
35283540
);
3529-
println!("Inserted row: {}", inserted_row);
3541+
println!("Expected measurement: {:#?}", measurement);
3542+
println!("Inserted block: {:#?}", inserted_block);
35303543
client
3531-
.execute_native(&insert_sql)
3544+
.insert_native(&insert_sql, inserted_block)
35323545
.await
3533-
.expect("Failed to insert measurement row");
3546+
.expect("Failed to insert measurement block");
35343547

35353548
// Select it exactly back out.
35363549
let select_sql = format!(
3537-
"SELECT * FROM {} WHERE timestamp = '{}' FORMAT {};",
3550+
"SELECT * FROM {}.{} WHERE timestamp = '{}'",
3551+
crate::DATABASE_NAME,
35383552
measurement_table,
35393553
measurement.timestamp().format(crate::DATABASE_TIMESTAMP_FORMAT),
3540-
crate::DATABASE_SELECT_FORMAT,
35413554
);
3542-
let body = client
3543-
.execute_with_body(select_sql)
3555+
let selected_block = client
3556+
.execute_with_block(&select_sql)
35443557
.await
35453558
.expect("Failed to select measurement row")
3546-
.1;
3547-
let (_, actual_row) = crate::model::parse_measurement_from_row(
3548-
&body,
3549-
measurement.datum_type(),
3550-
);
3551-
println!("Actual row: {actual_row:?}");
3559+
.data
3560+
.expect("Should have selected some data block");
3561+
let actual_measurements = Measurement::from_block(&selected_block)
3562+
.expect("Failed to extract measurement from block");
3563+
println!("Actual measurements: {actual_measurements:#?}");
3564+
assert_eq!(actual_measurements.len(), 1);
35523565
assert_eq!(
3553-
actual_row, measurement,
3554-
"Actual and expected measurement rows do not match"
3566+
actual_measurements[0], measurement,
3567+
"Actual and expected measurements do not match"
35553568
);
35563569
Ok(())
35573570
}

oximeter/db/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::path::PathBuf;
3535
use thiserror::Error;
3636

3737
mod client;
38-
pub mod model;
38+
pub(crate) mod model;
3939
pub mod native;
4040
#[cfg(any(feature = "oxql", test))]
4141
pub mod oxql;

oximeter/db/src/model/columns.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
//! Constants used for common column names.
6+
7+
// Copyright 2024 Oxide Computer Company
8+
9+
pub const TIMESERIES_NAME: &str = "timeseries_name";
10+
pub const TIMESERIES_KEY: &str = "timeseries_key";
11+
pub const DATUM_TYPE: &str = "datum_type";
12+
pub const FIELDS_DOT_NAME: &str = "fields.name";
13+
pub const FIELDS_DOT_SOURCE: &str = "fields.source";
14+
pub const FIELDS_DOT_TYPE: &str = "fields.type";
15+
pub const CREATED: &str = "created";
16+
pub const START_TIME: &str = "start_time";
17+
pub const TIMESTAMP: &str = "timestamp";
18+
pub const FIELD_NAME: &str = "field_name";
19+
pub const FIELD_VALUE: &str = "field_value";
20+
pub const DATUM: &str = "datum";
21+
pub const BINS: &str = "bins";
22+
pub const COUNTS: &str = "counts";
23+
pub const MIN: &str = "min";
24+
pub const MAX: &str = "max";
25+
pub const SUM_OF_SAMPLES: &str = "sum_of_samples";
26+
pub const SQUARED_MEAN: &str = "squared_mean";
27+
pub const P50_MARKER_HEIGHTS: &str = "p50_marker_heights";
28+
pub const P50_MARKER_POSITIONS: &str = "p50_marker_positions";
29+
pub const P50_DESIRED_MARKER_POSITIONS: &str = "p50_desired_marker_positions";
30+
pub const P90_MARKER_HEIGHTS: &str = "p90_marker_heights";
31+
pub const P90_MARKER_POSITIONS: &str = "p90_marker_positions";
32+
pub const P90_DESIRED_MARKER_POSITIONS: &str = "p90_desired_marker_positions";
33+
pub const P99_MARKER_HEIGHTS: &str = "p99_marker_heights";
34+
pub const P99_MARKER_POSITIONS: &str = "p99_marker_positions";
35+
pub const P99_DESIRED_MARKER_POSITIONS: &str = "p99_desired_marker_positions";
36+
37+
/// Supported quantiles for histograms.
38+
#[derive(Clone, Copy, Debug, strum::EnumIter)]
39+
pub enum Quantile {
40+
P50,
41+
P90,
42+
P99,
43+
}
44+
45+
impl Quantile {
46+
/// Return the marker height column name.
47+
pub const fn marker_heights(self) -> &'static str {
48+
match self {
49+
Quantile::P50 => P50_MARKER_HEIGHTS,
50+
Quantile::P90 => P90_MARKER_HEIGHTS,
51+
Quantile::P99 => P99_MARKER_HEIGHTS,
52+
}
53+
}
54+
55+
/// Return the marker position column name.
56+
pub const fn marker_positions(self) -> &'static str {
57+
match self {
58+
Quantile::P50 => P50_MARKER_POSITIONS,
59+
Quantile::P90 => P90_MARKER_POSITIONS,
60+
Quantile::P99 => P99_MARKER_POSITIONS,
61+
}
62+
}
63+
64+
/// Return the desired marker position column name.
65+
pub const fn desired_marker_positions(self) -> &'static str {
66+
match self {
67+
Quantile::P50 => P50_DESIRED_MARKER_POSITIONS,
68+
Quantile::P90 => P90_DESIRED_MARKER_POSITIONS,
69+
Quantile::P99 => P99_DESIRED_MARKER_POSITIONS,
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)