From b67ed5c0e1c659e961e1d1d3ba8f4b0880104c6b Mon Sep 17 00:00:00 2001 From: Charlton Rodda Date: Tue, 11 Aug 2020 18:34:34 +0100 Subject: [PATCH] Add TaggedSeries and deserialize_next_tagged for GROUP BY --- .../src/integrations/serde_integration/de.rs | 105 +++++++++++++++++- .../src/integrations/serde_integration/mod.rs | 29 +++++ influxdb/tests/integration_tests.rs | 63 +++++++++++ 3 files changed, 196 insertions(+), 1 deletion(-) diff --git a/influxdb/src/integrations/serde_integration/de.rs b/influxdb/src/integrations/serde_integration/de.rs index 0a1343c..fd9bf99 100644 --- a/influxdb/src/integrations/serde_integration/de.rs +++ b/influxdb/src/integrations/serde_integration/de.rs @@ -1,4 +1,4 @@ -use super::Series; +use super::{Series, TaggedSeries}; use serde::de::{ value, DeserializeSeed, Deserializer, Error, IntoDeserializer, MapAccess, SeqAccess, Visitor, }; @@ -96,6 +96,109 @@ where } } +// Based on https://serde.rs/deserialize-struct.html +impl<'de, TAG, T> Deserialize<'de> for TaggedSeries +where + TAG: Deserialize<'de>, + T: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + // Field name deserializer + #[derive(Deserialize)] + #[serde(field_identifier, rename_all = "lowercase")] + enum Field { + Name, + Tags, + Columns, + Values, + }; + + struct SeriesVisitor { + _tag_type: PhantomData, + _value_type: PhantomData, + }; + + impl<'de, TAG, T> Visitor<'de> for SeriesVisitor + where + TAG: Deserialize<'de>, + T: Deserialize<'de>, + { + type Value = TaggedSeries; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("struct TaggedSeries") + } + + fn visit_map(self, mut map: V) -> Result, V::Error> + where + V: MapAccess<'de>, + { + let mut name = None; + let mut tags: Option = None; + let mut columns: Option> = None; + let mut values: Option> = None; + while let Some(key) = map.next_key()? { + match key { + Field::Name => { + if name.is_some() { + return Err(Error::duplicate_field("name")); + } + name = Some(map.next_value()?); + } + Field::Tags => { + if tags.is_some() { + return Err(Error::duplicate_field("tags")); + } + tags = Some(map.next_value()?); + } + Field::Columns => { + if columns.is_some() { + return Err(Error::duplicate_field("columns")); + } + columns = Some(map.next_value()?); + } + Field::Values => { + if values.is_some() { + return Err(Error::duplicate_field("values")); + } + // Error out if "values" is encountered before "columns" + // Hopefully, InfluxDB never does this. + if columns.is_none() { + return Err(Error::custom( + "series values encountered before columns", + )); + } + // Deserialize using a HeaderVec deserializer + // seeded with the headers from the "columns" field + values = Some(map.next_value_seed(HeaderVec:: { + header: columns.as_ref().unwrap(), + _inner_type: PhantomData, + })?); + } + } + } + let name = name.ok_or_else(|| Error::missing_field("name"))?; + let tags = tags.ok_or_else(|| Error::missing_field("tags"))?; + let values = values.ok_or_else(|| Error::missing_field("values"))?; + Ok(TaggedSeries { name, tags, values }) + } + } + + const FIELDS: &[&str] = &["name", "tags", "values"]; + deserializer.deserialize_struct( + "TaggedSeries", + FIELDS, + SeriesVisitor:: { + _tag_type: PhantomData, + _value_type: PhantomData, + }, + ) + } +} + // Deserializer that takes a header as a seed // and deserializes an array of arrays into a // Vec of map-like values using the header as diff --git a/influxdb/src/integrations/serde_integration/mod.rs b/influxdb/src/integrations/serde_integration/mod.rs index 91dddde..708dcc5 100644 --- a/influxdb/src/integrations/serde_integration/mod.rs +++ b/influxdb/src/integrations/serde_integration/mod.rs @@ -77,6 +77,20 @@ impl DatabaseQueryResult { } }) } + + pub fn deserialize_next_tagged( + &mut self, + ) -> Result, Error> + where + TAG: DeserializeOwned + Send, + T: DeserializeOwned + Send, + { + serde_json::from_value::>(self.results.remove(0)).map_err(|err| { + Error::DeserializationError { + error: format!("could not deserialize: {}", err), + } + }) + } } #[derive(Deserialize, Debug)] @@ -93,6 +107,21 @@ pub struct Series { pub values: Vec, } +#[derive(Deserialize, Debug)] +#[doc(hidden)] +pub struct TaggedReturn { + #[serde(default = "Vec::new")] + pub series: Vec>, +} + +#[derive(Debug)] +/// Represents a returned series from InfluxDB +pub struct TaggedSeries { + pub name: String, + pub tags: TAG, + pub values: Vec, +} + impl Client { pub async fn json_query(&self, q: ReadQuery) -> Result { let query = q.build().unwrap(); diff --git a/influxdb/tests/integration_tests.rs b/influxdb/tests/integration_tests.rs index 44d519a..938538d 100644 --- a/influxdb/tests/integration_tests.rs +++ b/influxdb/tests/integration_tests.rs @@ -358,6 +358,69 @@ async fn test_json_query() { .await; } +/// INTEGRATION TEST +/// +/// This test case tests whether the response to a GROUP BY can be parsed by +// deserialize_next_tagged into a tags struct +#[tokio::test] +#[cfg(feature = "use-serde")] +async fn test_json_query_tagged() { + use serde::Deserialize; + + const TEST_NAME: &str = "test_json_query_tagged"; + + run_test( + || async move { + create_db(TEST_NAME).await.expect("could not setup db"); + + let client = create_client(TEST_NAME); + + let write_query = Timestamp::Hours(11) + .into_query("weather") + .add_tag("location", "London") + .add_field("temperature", 82); + let write_result = client.query(&write_query).await; + assert_result_ok(&write_result); + + #[derive(Deserialize, Debug, PartialEq)] + struct WeatherMeta { + location: String, + } + + #[derive(Deserialize, Debug, PartialEq)] + struct Weather { + time: String, + temperature: i32, + } + + let query = Query::raw_read_query("SELECT * FROM weather GROUP BY location"); + let result = client.json_query(query).await.and_then(|mut db_result| { + db_result.deserialize_next_tagged::() + }); + assert_result_ok(&result); + let result = result.unwrap(); + + assert_eq!( + result.series[0].tags, + WeatherMeta { + location: "London".to_string(), + } + ); + assert_eq!( + result.series[0].values[0], + Weather { + time: "1970-01-01T11:00:00Z".to_string(), + temperature: 82 + } + ); + }, + || async move { + delete_db(TEST_NAME).await.expect("could not clean up db"); + }, + ) + .await; +} + /// INTEGRATION TEST /// /// This test case tests whether JSON can be decoded from a InfluxDB response and wether that JSON