Skip to content

Commit 14fe026

Browse files
committed
Insert samples using the native client
- Remove unneeded and error-prone `n_columns` and `n_rows` fields from the `Block` type. These are now methods that delegate to the actual column arrays, which are the source of truth anyway. - Add methods for extracting fields from a sample as a set of data blocks, one per field table. - Add methods to extract measurements from a sample as a block destined for one table. Take care to correctly construct missing samples, especially for histograms. Also implement the `FromBlock` trait for samples, to ensure we can extract the raw data as well. This is only used in tests, in this commit. - Insert fields and measurements from a sample as a data block, using the native interface. - Fix serde of UUIDs in native format, which doesn't match the documentation at least for our current version of ClickHouse. - Remove code serializing fields and measurements into JSON. - Closes #6884
1 parent c4c4d96 commit 14fe026

File tree

19 files changed

+2307
-1105
lines changed

19 files changed

+2307
-1105
lines changed

oximeter/collector/src/bin/clickhouse-schema-updater.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use clap::Parser;
1313
use clap::Subcommand;
1414
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
1515
use omicron_common::address::CLICKHOUSE_TCP_PORT;
16-
use oximeter_db::model::OXIMETER_VERSION;
1716
use oximeter_db::Client;
17+
use oximeter_db::OXIMETER_VERSION;
1818
use slog::Drain;
1919
use slog::Level;
2020
use slog::LevelFilter;

oximeter/db/src/client/dbwrite.rs

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,22 @@
99
use crate::client::Client;
1010
use crate::model;
1111
use crate::model::to_block::ToBlock as _;
12+
use crate::native::block::Block;
1213
use crate::Error;
1314
use camino::Utf8PathBuf;
1415
use oximeter::Sample;
1516
use oximeter::TimeseriesSchema;
1617
use slog::debug;
18+
use std::collections::btree_map::Entry;
1719
use std::collections::BTreeMap;
1820
use std::collections::BTreeSet;
1921

2022
#[derive(Debug)]
2123
pub(super) struct UnrolledSampleRows {
2224
/// The timeseries schema rows.
2325
pub new_schema: Vec<TimeseriesSchema>,
24-
/// The rows to insert in all the other tables, keyed by the table name.
25-
pub rows: BTreeMap<String, Vec<String>>,
26+
/// The blocks to insert in all the other tables, keyed by the table name.
27+
pub blocks: BTreeMap<String, Block>,
2628
}
2729

2830
/// A trait allowing a [`Client`] to write data into the timeseries database.
@@ -54,10 +56,10 @@ impl DbWrite for Client {
5456
/// Insert the given samples into the database.
5557
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
5658
debug!(self.log, "unrolling {} total samples", samples.len());
57-
let UnrolledSampleRows { new_schema, rows } =
59+
let UnrolledSampleRows { new_schema, blocks } =
5860
self.unroll_samples(samples).await;
5961
self.save_new_schema_or_remove(new_schema).await?;
60-
self.insert_unrolled_samples(rows).await
62+
self.insert_unrolled_samples(blocks).await
6163
}
6264

6365
/// Initialize the replicated telemetry database, creating tables as needed.
@@ -172,7 +174,7 @@ impl Client {
172174
samples: &[Sample],
173175
) -> UnrolledSampleRows {
174176
let mut seen_timeseries = BTreeSet::new();
175-
let mut rows = BTreeMap::new();
177+
let mut table_blocks = BTreeMap::new();
176178
let mut new_schema = BTreeMap::new();
177179

178180
for sample in samples.iter() {
@@ -200,48 +202,61 @@ impl Client {
200202
crate::timeseries_key(sample),
201203
);
202204
if !seen_timeseries.contains(&key) {
203-
for (table_name, table_rows) in model::unroll_field_rows(sample)
205+
for (table_name, block) in
206+
model::fields::extract_fields_as_block(sample)
204207
{
205-
rows.entry(table_name)
206-
.or_insert_with(Vec::new)
207-
.extend(table_rows);
208+
match table_blocks.entry(table_name) {
209+
Entry::Vacant(entry) => {
210+
entry.insert(block);
211+
}
212+
Entry::Occupied(mut entry) => entry
213+
.get_mut()
214+
.concat(block)
215+
.expect("All blocks for a table must match"),
216+
}
208217
}
209218
}
210219

211-
let (table_name, measurement_row) =
212-
model::unroll_measurement_row(sample);
213-
214-
rows.entry(table_name)
215-
.or_insert_with(Vec::new)
216-
.push(measurement_row);
220+
let (table_name, measurement_block) =
221+
model::measurements::extract_measurement_as_block(sample);
222+
match table_blocks.entry(table_name) {
223+
Entry::Vacant(entry) => {
224+
entry.insert(measurement_block);
225+
}
226+
Entry::Occupied(mut entry) => entry
227+
.get_mut()
228+
.concat(measurement_block)
229+
.expect("All blocks for a table must match"),
230+
}
217231

218232
seen_timeseries.insert(key);
219233
}
220234

221235
let new_schema = new_schema.into_values().collect();
222-
UnrolledSampleRows { new_schema, rows }
236+
UnrolledSampleRows { new_schema, blocks: table_blocks }
223237
}
224238

225239
// Insert unrolled sample rows into the corresponding tables.
226240
async fn insert_unrolled_samples(
227241
&self,
228-
rows: BTreeMap<String, Vec<String>>,
242+
blocks: BTreeMap<String, Block>,
229243
) -> Result<(), Error> {
230-
for (table_name, rows) in rows {
231-
let body = format!(
232-
"INSERT INTO {table_name} FORMAT JSONEachRow\n{row_data}\n",
244+
for (table_name, block) in blocks {
245+
let n_rows = block.n_rows();
246+
let query = format!(
247+
"INSERT INTO {db_name}.{table_name} FORMAT Native",
248+
db_name = crate::DATABASE_NAME,
233249
table_name = table_name,
234-
row_data = rows.join("\n")
235250
);
236251
// TODO-robustness We've verified the schema, so this is likely a transient failure.
237252
// But we may want to check the actual error condition, and, if possible, continue
238253
// inserting any remaining data.
239-
self.execute(body).await?;
254+
self.insert_native(&query, block).await?;
240255
debug!(
241256
self.log,
242257
"inserted rows into table";
243-
"n_rows" => rows.len(),
244-
"table_name" => table_name,
258+
"n_rows" => n_rows,
259+
"table_name" => &table_name,
245260
);
246261
}
247262

oximeter/db/src/client/mod.rs

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ impl Client {
10041004
returned an empty data block",
10051005
))
10061006
})
1007-
.map(|block| block.n_rows > 0)
1007+
.map(|block| block.n_rows() > 0)
10081008
})
10091009
}
10101010

@@ -1161,17 +1161,6 @@ impl Client {
11611161
result
11621162
}
11631163

1164-
// Execute a generic SQL statement.
1165-
//
1166-
// TODO-robustness This currently does no validation of the statement.
1167-
async fn execute<S>(&self, sql: S) -> Result<(), Error>
1168-
where
1169-
S: Into<String>,
1170-
{
1171-
self.execute_with_body(sql).await?;
1172-
Ok(())
1173-
}
1174-
11751164
// Execute a generic SQL statement, awaiting the response as text
11761165
//
11771166
// TODO-robustness This currently does no validation of the statement.
@@ -1281,14 +1270,14 @@ impl Client {
12811270
trace!(self.log, "no new timeseries schema in database");
12821271
return Ok(());
12831272
};
1284-
if data.n_rows == 0 {
1273+
if data.n_rows() == 0 {
12851274
trace!(self.log, "no new timeseries schema in database");
12861275
return Ok(());
12871276
}
12881277
trace!(
12891278
self.log,
12901279
"retrieved new timeseries schema";
1291-
"n_schema" => data.n_rows,
1280+
"n_schema" => data.n_rows(),
12921281
);
12931282
for new_schema in TimeseriesSchema::from_block(data)?.into_iter() {
12941283
schema.insert(new_schema.timeseries_name.clone(), new_schema);
@@ -3492,41 +3481,44 @@ mod tests {
34923481
// Insert a record from this datum.
34933482
const TIMESERIES_NAME: &str = "foo:bar";
34943483
const TIMESERIES_KEY: u64 = 101;
3495-
let (measurement_table, inserted_row) =
3496-
crate::model::unroll_measurement_row_impl(
3484+
let (measurement_table, inserted_block) =
3485+
crate::model::measurements::extract_measurement_as_block_impl(
34973486
TIMESERIES_NAME.to_string(),
34983487
TIMESERIES_KEY,
34993488
&measurement,
35003489
);
35013490
let insert_sql = format!(
3502-
"INSERT INTO {measurement_table} FORMAT JSONEachRow {inserted_row}",
3491+
"INSERT INTO {}.{} FORMAT Native ",
3492+
crate::DATABASE_NAME,
3493+
measurement_table,
35033494
);
3504-
println!("Inserted row: {}", inserted_row);
3495+
println!("Expected measurement: {:#?}", measurement);
3496+
println!("Inserted block: {:#?}", inserted_block);
35053497
client
3506-
.execute_native(&insert_sql)
3498+
.insert_native(&insert_sql, inserted_block)
35073499
.await
3508-
.expect("Failed to insert measurement row");
3500+
.expect("Failed to insert measurement block");
35093501

35103502
// Select it exactly back out.
35113503
let select_sql = format!(
3512-
"SELECT * FROM {} WHERE timestamp = '{}' FORMAT {};",
3504+
"SELECT * FROM {}.{} WHERE timestamp = '{}'",
3505+
crate::DATABASE_NAME,
35133506
measurement_table,
35143507
measurement.timestamp().format(crate::DATABASE_TIMESTAMP_FORMAT),
3515-
crate::DATABASE_SELECT_FORMAT,
35163508
);
3517-
let body = client
3518-
.execute_with_body(select_sql)
3509+
let selected_block = client
3510+
.execute_with_block(&select_sql)
35193511
.await
35203512
.expect("Failed to select measurement row")
3521-
.1;
3522-
let (_, actual_row) = crate::model::parse_measurement_from_row(
3523-
&body,
3524-
measurement.datum_type(),
3525-
);
3526-
println!("Actual row: {actual_row:?}");
3513+
.data
3514+
.expect("Should have selected some data block");
3515+
let actual_measurements = Measurement::from_block(&selected_block)
3516+
.expect("Failed to extract measurement from block");
3517+
println!("Actual measurements: {actual_measurements:#?}");
3518+
assert_eq!(actual_measurements.len(), 1);
35273519
assert_eq!(
3528-
actual_row, measurement,
3529-
"Actual and expected measurement rows do not match"
3520+
actual_measurements[0], measurement,
3521+
"Actual and expected measurements do not match"
35303522
);
35313523
Ok(())
35323524
}

oximeter/db/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::path::PathBuf;
3535
use thiserror::Error;
3636

3737
mod client;
38-
pub mod model;
38+
pub(crate) mod model;
3939
pub mod native;
4040
#[cfg(any(feature = "oxql", test))]
4141
pub mod oxql;

0 commit comments

Comments
 (0)