Skip to content

Allow GROUP BY queries by providing deserialize_next_tagged to deserialize the group fields #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 104 additions & 1 deletion influxdb/src/integrations/serde_integration/de.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::Series;
use super::{Series, TaggedSeries};
use serde::de::{
value, DeserializeSeed, Deserializer, Error, IntoDeserializer, MapAccess, SeqAccess, Visitor,
};
Expand Down Expand Up @@ -96,6 +96,109 @@ where
}
}

// Based on https://serde.rs/deserialize-struct.html
impl<'de, TAG, T> Deserialize<'de> for TaggedSeries<TAG, T>
where
TAG: Deserialize<'de>,
T: Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// Field name deserializer
#[derive(Deserialize)]
#[serde(field_identifier, rename_all = "lowercase")]
enum Field {
Name,
Tags,
Columns,
Values,
};

struct SeriesVisitor<TAG, T> {
_tag_type: PhantomData<TAG>,
_value_type: PhantomData<T>,
};

impl<'de, TAG, T> Visitor<'de> for SeriesVisitor<TAG, T>
where
TAG: Deserialize<'de>,
T: Deserialize<'de>,
{
type Value = TaggedSeries<TAG, T>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct TaggedSeries")
}

fn visit_map<V>(self, mut map: V) -> Result<TaggedSeries<TAG, T>, V::Error>
where
V: MapAccess<'de>,
{
let mut name = None;
let mut tags: Option<TAG> = None;
let mut columns: Option<Vec<String>> = None;
let mut values: Option<Vec<T>> = None;
while let Some(key) = map.next_key()? {
match key {
Field::Name => {
if name.is_some() {
return Err(Error::duplicate_field("name"));
}
name = Some(map.next_value()?);
}
Field::Tags => {
if tags.is_some() {
return Err(Error::duplicate_field("tags"));
}
tags = Some(map.next_value()?);
}
Field::Columns => {
if columns.is_some() {
return Err(Error::duplicate_field("columns"));
}
columns = Some(map.next_value()?);
}
Field::Values => {
if values.is_some() {
return Err(Error::duplicate_field("values"));
}
// Error out if "values" is encountered before "columns"
// Hopefully, InfluxDB never does this.
if columns.is_none() {
return Err(Error::custom(
"series values encountered before columns",
));
}
// Deserialize using a HeaderVec deserializer
// seeded with the headers from the "columns" field
values = Some(map.next_value_seed(HeaderVec::<T> {
header: columns.as_ref().unwrap(),
_inner_type: PhantomData,
})?);
}
}
}
let name = name.ok_or_else(|| Error::missing_field("name"))?;
let tags = tags.ok_or_else(|| Error::missing_field("tags"))?;
let values = values.ok_or_else(|| Error::missing_field("values"))?;
Ok(TaggedSeries { name, tags, values })
}
}

const FIELDS: &[&str] = &["name", "tags", "values"];
deserializer.deserialize_struct(
"TaggedSeries",
FIELDS,
SeriesVisitor::<TAG, T> {
_tag_type: PhantomData,
_value_type: PhantomData,
},
)
}
}

// Deserializer that takes a header as a seed
// and deserializes an array of arrays into a
// Vec of map-like values using the header as
Expand Down
29 changes: 29 additions & 0 deletions influxdb/src/integrations/serde_integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ impl DatabaseQueryResult {
}
})
}

pub fn deserialize_next_tagged<TAG, T: 'static>(
&mut self,
) -> Result<TaggedReturn<TAG, T>, Error>
where
TAG: DeserializeOwned + Send,
T: DeserializeOwned + Send,
{
serde_json::from_value::<TaggedReturn<TAG, T>>(self.results.remove(0)).map_err(|err| {
Error::DeserializationError {
error: format!("could not deserialize: {}", err),
}
})
}
}

#[derive(Deserialize, Debug)]
Expand All @@ -93,6 +107,21 @@ pub struct Series<T> {
pub values: Vec<T>,
}

#[derive(Deserialize, Debug)]
#[doc(hidden)]
pub struct TaggedReturn<TAG, T> {
#[serde(default = "Vec::new")]
pub series: Vec<TaggedSeries<TAG, T>>,
}

#[derive(Debug)]
/// Represents a returned series from InfluxDB
pub struct TaggedSeries<TAG, T> {
pub name: String,
pub tags: TAG,
pub values: Vec<T>,
}

impl Client {
pub async fn json_query(&self, q: ReadQuery) -> Result<DatabaseQueryResult, Error> {
let query = q.build().unwrap();
Expand Down
63 changes: 63 additions & 0 deletions influxdb/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,69 @@ async fn test_json_query() {
.await;
}

/// INTEGRATION TEST
///
/// This test case tests whether the response to a GROUP BY can be parsed by
// deserialize_next_tagged into a tags struct
#[tokio::test]
#[cfg(feature = "use-serde")]
async fn test_json_query_tagged() {
use serde::Deserialize;

const TEST_NAME: &str = "test_json_query_tagged";

run_test(
|| async move {
create_db(TEST_NAME).await.expect("could not setup db");

let client = create_client(TEST_NAME);

let write_query = Timestamp::Hours(11)
.into_query("weather")
.add_tag("location", "London")
.add_field("temperature", 82);
let write_result = client.query(&write_query).await;
assert_result_ok(&write_result);

#[derive(Deserialize, Debug, PartialEq)]
struct WeatherMeta {
location: String,
}

#[derive(Deserialize, Debug, PartialEq)]
struct Weather {
time: String,
temperature: i32,
}

let query = Query::raw_read_query("SELECT * FROM weather GROUP BY location");
let result = client.json_query(query).await.and_then(|mut db_result| {
db_result.deserialize_next_tagged::<WeatherMeta, Weather>()
});
assert_result_ok(&result);
let result = result.unwrap();

assert_eq!(
result.series[0].tags,
WeatherMeta {
location: "London".to_string(),
}
);
assert_eq!(
result.series[0].values[0],
Weather {
time: "1970-01-01T11:00:00Z".to_string(),
temperature: 82
}
);
},
|| async move {
delete_db(TEST_NAME).await.expect("could not clean up db");
},
)
.await;
}

/// INTEGRATION TEST
///
/// This test case tests whether JSON can be decoded from a InfluxDB response and wether that JSON
Expand Down