Skip to content

Insert samples using the native client #7013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oximeter/collector/src/bin/clickhouse-schema-updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use clap::Parser;
use clap::Subcommand;
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use oximeter_db::model::OXIMETER_VERSION;
use oximeter_db::Client;
use oximeter_db::OXIMETER_VERSION;
use slog::Drain;
use slog::Level;
use slog::LevelFilter;
Expand Down
1 change: 1 addition & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ slog.workspace = true
slog-async.workspace = true
slog-dtrace.workspace = true
slog-term.workspace = true
strum.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
usdt.workspace = true
Expand Down
63 changes: 39 additions & 24 deletions oximeter/db/src/client/dbwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@
use crate::client::Client;
use crate::model;
use crate::model::to_block::ToBlock as _;
use crate::native::block::Block;
use crate::Error;
use camino::Utf8PathBuf;
use oximeter::Sample;
use oximeter::TimeseriesSchema;
use slog::debug;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::collections::BTreeSet;

#[derive(Debug)]
pub(super) struct UnrolledSampleRows {
/// The timeseries schema rows.
pub new_schema: Vec<TimeseriesSchema>,
/// The rows to insert in all the other tables, keyed by the table name.
pub rows: BTreeMap<String, Vec<String>>,
/// The blocks to insert in all the other tables, keyed by the table name.
pub blocks: BTreeMap<String, Block>,
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
Expand Down Expand Up @@ -54,10 +56,10 @@ impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
let UnrolledSampleRows { new_schema, rows } =
let UnrolledSampleRows { new_schema, blocks } =
self.unroll_samples(samples).await;
self.save_new_schema_or_remove(new_schema).await?;
self.insert_unrolled_samples(rows).await
self.insert_unrolled_samples(blocks).await
}

/// Initialize the replicated telemetry database, creating tables as needed.
Expand Down Expand Up @@ -172,7 +174,7 @@ impl Client {
samples: &[Sample],
) -> UnrolledSampleRows {
let mut seen_timeseries = BTreeSet::new();
let mut rows = BTreeMap::new();
let mut table_blocks = BTreeMap::new();
let mut new_schema = BTreeMap::new();

for sample in samples.iter() {
Expand Down Expand Up @@ -200,48 +202,61 @@ impl Client {
crate::timeseries_key(sample),
);
if !seen_timeseries.contains(&key) {
for (table_name, table_rows) in model::unroll_field_rows(sample)
for (table_name, block) in
model::fields::extract_fields_as_block(sample)
{
rows.entry(table_name)
.or_insert_with(Vec::new)
.extend(table_rows);
match table_blocks.entry(table_name) {
Entry::Vacant(entry) => {
entry.insert(block);
}
Entry::Occupied(mut entry) => entry
.get_mut()
.concat(block)
.expect("All blocks for a table must match"),
}
}
}

let (table_name, measurement_row) =
model::unroll_measurement_row(sample);

rows.entry(table_name)
.or_insert_with(Vec::new)
.push(measurement_row);
let (table_name, measurement_block) =
model::measurements::extract_measurement_as_block(sample);
match table_blocks.entry(table_name) {
Entry::Vacant(entry) => {
entry.insert(measurement_block);
}
Entry::Occupied(mut entry) => entry
.get_mut()
.concat(measurement_block)
.expect("All blocks for a table must match"),
}

seen_timeseries.insert(key);
}

let new_schema = new_schema.into_values().collect();
UnrolledSampleRows { new_schema, rows }
UnrolledSampleRows { new_schema, blocks: table_blocks }
}

// Insert unrolled sample rows into the corresponding tables.
async fn insert_unrolled_samples(
&self,
rows: BTreeMap<String, Vec<String>>,
blocks: BTreeMap<String, Block>,
) -> Result<(), Error> {
for (table_name, rows) in rows {
let body = format!(
"INSERT INTO {table_name} FORMAT JSONEachRow\n{row_data}\n",
for (table_name, block) in blocks {
let n_rows = block.n_rows();
let query = format!(
"INSERT INTO {db_name}.{table_name} FORMAT Native",
db_name = crate::DATABASE_NAME,
table_name = table_name,
row_data = rows.join("\n")
);
// TODO-robustness We've verified the schema, so this is likely a transient failure.
// But we may want to check the actual error condition, and, if possible, continue
// inserting any remaining data.
self.execute(body).await?;
self.insert_native(&query, block).await?;
debug!(
self.log,
"inserted rows into table";
"n_rows" => rows.len(),
"table_name" => table_name,
"n_rows" => n_rows,
"table_name" => &table_name,
);
}

Expand Down
91 changes: 52 additions & 39 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ impl Client {
returned an empty data block",
))
})
.map(|block| block.n_rows > 0)
.map(|block| block.n_rows() > 0)
})
}

Expand Down Expand Up @@ -1123,15 +1123,28 @@ impl Client {
) -> Result<QueryResult, Error> {
trace!(
self.log,
"executing SQL query";
"inserting data";
"sql" => sql,
"n_rows" => block.n_rows(),
"n_columns" => block.n_columns(),
);
let mut handle = self.native_pool.claim().await?;
let id = usdt::UniqueId::new();
probes::sql__query__start!(|| (&id, &sql));
let result = handle.insert(sql, block).await.map_err(Error::from);
probes::sql__query__start!(|| (&id, sql));
let now = tokio::time::Instant::now();
let result = tokio::time::timeout(
self.request_timeout,
handle.insert(sql, block),
)
.await;
let elapsed = now.elapsed();
probes::sql__query__done!(|| (&id));
result
match result {
Ok(result) => result.map_err(Error::from),
Err(_) => Err(Error::DatabaseUnavailable(format!(
"SQL query timed out after {elapsed:?}"
))),
}
}

// Execute a generic SQL statement, using the native TCP interface.
Expand All @@ -1155,21 +1168,18 @@ impl Client {

let mut handle = self.native_pool.claim().await?;
let id = usdt::UniqueId::new();
probes::sql__query__start!(|| (&id, &sql));
let result = handle.query(sql).await.map_err(Error::from);
probes::sql__query__start!(|| (&id, sql));
let now = tokio::time::Instant::now();
let result =
tokio::time::timeout(self.request_timeout, handle.query(sql)).await;
let elapsed = now.elapsed();
probes::sql__query__done!(|| (&id));
result
}

// Execute a generic SQL statement.
//
// TODO-robustness This currently does no validation of the statement.
async fn execute<S>(&self, sql: S) -> Result<(), Error>
where
S: Into<String>,
{
self.execute_with_body(sql).await?;
Ok(())
match result {
Ok(result) => result.map_err(Error::from),
Err(_) => Err(Error::DatabaseUnavailable(format!(
"SQL query timed out after {elapsed:?}"
))),
}
}

// Execute a generic SQL statement, awaiting the response as text
Expand Down Expand Up @@ -1281,14 +1291,14 @@ impl Client {
trace!(self.log, "no new timeseries schema in database");
return Ok(());
};
if data.n_rows == 0 {
if data.n_rows() == 0 {
trace!(self.log, "no new timeseries schema in database");
return Ok(());
}
trace!(
self.log,
"retrieved new timeseries schema";
"n_schema" => data.n_rows,
"n_schema" => data.n_rows(),
);
for new_schema in TimeseriesSchema::from_block(data)?.into_iter() {
schema.insert(new_schema.timeseries_name.clone(), new_schema);
Expand Down Expand Up @@ -3492,41 +3502,44 @@ mod tests {
// Insert a record from this datum.
const TIMESERIES_NAME: &str = "foo:bar";
const TIMESERIES_KEY: u64 = 101;
let (measurement_table, inserted_row) =
crate::model::unroll_measurement_row_impl(
let (measurement_table, inserted_block) =
crate::model::measurements::extract_measurement_as_block_impl(
TIMESERIES_NAME.to_string(),
TIMESERIES_KEY,
&measurement,
);
let insert_sql = format!(
"INSERT INTO {measurement_table} FORMAT JSONEachRow {inserted_row}",
"INSERT INTO {}.{} FORMAT Native ",
crate::DATABASE_NAME,
measurement_table,
);
println!("Inserted row: {}", inserted_row);
println!("Expected measurement: {:#?}", measurement);
println!("Inserted block: {:#?}", inserted_block);
client
.execute_native(&insert_sql)
.insert_native(&insert_sql, inserted_block)
.await
.expect("Failed to insert measurement row");
.expect("Failed to insert measurement block");

// Select it exactly back out.
let select_sql = format!(
"SELECT * FROM {} WHERE timestamp = '{}' FORMAT {};",
"SELECT * FROM {}.{} WHERE timestamp = '{}'",
crate::DATABASE_NAME,
measurement_table,
measurement.timestamp().format(crate::DATABASE_TIMESTAMP_FORMAT),
crate::DATABASE_SELECT_FORMAT,
);
let body = client
.execute_with_body(select_sql)
let selected_block = client
.execute_with_block(&select_sql)
.await
.expect("Failed to select measurement row")
.1;
let (_, actual_row) = crate::model::parse_measurement_from_row(
&body,
measurement.datum_type(),
);
println!("Actual row: {actual_row:?}");
.data
.expect("Should have selected some data block");
let actual_measurements = Measurement::from_block(&selected_block)
.expect("Failed to extract measurement from block");
println!("Actual measurements: {actual_measurements:#?}");
assert_eq!(actual_measurements.len(), 1);
assert_eq!(
actual_row, measurement,
"Actual and expected measurement rows do not match"
actual_measurements[0], measurement,
"Actual and expected measurements do not match"
);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::path::PathBuf;
use thiserror::Error;

mod client;
pub mod model;
pub(crate) mod model;
pub mod native;
#[cfg(any(feature = "oxql", test))]
pub mod oxql;
Expand Down
72 changes: 72 additions & 0 deletions oximeter/db/src/model/columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Constants used for common column names.

// Copyright 2024 Oxide Computer Company

pub const TIMESERIES_NAME: &str = "timeseries_name";
pub const TIMESERIES_KEY: &str = "timeseries_key";
pub const DATUM_TYPE: &str = "datum_type";
pub const FIELDS_DOT_NAME: &str = "fields.name";
pub const FIELDS_DOT_SOURCE: &str = "fields.source";
pub const FIELDS_DOT_TYPE: &str = "fields.type";
pub const CREATED: &str = "created";
pub const START_TIME: &str = "start_time";
pub const TIMESTAMP: &str = "timestamp";
pub const FIELD_NAME: &str = "field_name";
pub const FIELD_VALUE: &str = "field_value";
pub const DATUM: &str = "datum";
pub const BINS: &str = "bins";
pub const COUNTS: &str = "counts";
pub const MIN: &str = "min";
pub const MAX: &str = "max";
pub const SUM_OF_SAMPLES: &str = "sum_of_samples";
pub const SQUARED_MEAN: &str = "squared_mean";
pub const P50_MARKER_HEIGHTS: &str = "p50_marker_heights";
pub const P50_MARKER_POSITIONS: &str = "p50_marker_positions";
pub const P50_DESIRED_MARKER_POSITIONS: &str = "p50_desired_marker_positions";
pub const P90_MARKER_HEIGHTS: &str = "p90_marker_heights";
pub const P90_MARKER_POSITIONS: &str = "p90_marker_positions";
pub const P90_DESIRED_MARKER_POSITIONS: &str = "p90_desired_marker_positions";
pub const P99_MARKER_HEIGHTS: &str = "p99_marker_heights";
pub const P99_MARKER_POSITIONS: &str = "p99_marker_positions";
pub const P99_DESIRED_MARKER_POSITIONS: &str = "p99_desired_marker_positions";

/// Supported quantiles for histograms.
#[derive(Clone, Copy, Debug, strum::EnumIter)]
pub enum Quantile {
P50,
P90,
P99,
}

impl Quantile {
/// Return the marker height column name.
pub const fn marker_heights(self) -> &'static str {
match self {
Quantile::P50 => P50_MARKER_HEIGHTS,
Quantile::P90 => P90_MARKER_HEIGHTS,
Quantile::P99 => P99_MARKER_HEIGHTS,
}
}

/// Return the marker position column name.
pub const fn marker_positions(self) -> &'static str {
match self {
Quantile::P50 => P50_MARKER_POSITIONS,
Quantile::P90 => P90_MARKER_POSITIONS,
Quantile::P99 => P99_MARKER_POSITIONS,
}
}

/// Return the desired marker position column name.
pub const fn desired_marker_positions(self) -> &'static str {
match self {
Quantile::P50 => P50_DESIRED_MARKER_POSITIONS,
Quantile::P90 => P90_DESIRED_MARKER_POSITIONS,
Quantile::P99 => P99_DESIRED_MARKER_POSITIONS,
}
}
}
Loading
Loading