diff --git a/influxdb/src/client/mod.rs b/influxdb/src/client/mod.rs index e391708..55e0761 100644 --- a/influxdb/src/client/mod.rs +++ b/influxdb/src/client/mod.rs @@ -131,7 +131,7 @@ impl Client { /// Add authorization token to [`Client`](crate::Client) /// /// This is designed for influxdb 2.0's backward-compatible API which - /// requires authrozation by default. You can create such token from + /// requires authorization by default. You can create such token from /// console of influxdb 2.0 . pub fn with_token(mut self, token: S) -> Self where diff --git a/influxdb/src/query/line_proto_term.rs b/influxdb/src/query/line_proto_term.rs index dca0b2a..422ef7a 100644 --- a/influxdb/src/query/line_proto_term.rs +++ b/influxdb/src/query/line_proto_term.rs @@ -25,12 +25,22 @@ impl LineProtoTerm<'_> { match self { Measurement(x) => Self::escape_any(x, &COMMAS_SPACES), TagKey(x) | FieldKey(x) => Self::escape_any(x, &COMMAS_SPACES_EQUALS), - FieldValue(x) => Self::escape_field_value(x), + FieldValue(x) => Self::escape_field_value(x, false), TagValue(x) => Self::escape_tag_value(x), } } - fn escape_field_value(v: &Type) -> String { + pub fn escape_v2(self) -> String { + use LineProtoTerm::*; + match self { + Measurement(x) => Self::escape_any(x, &COMMAS_SPACES), + TagKey(x) | FieldKey(x) => Self::escape_any(x, &COMMAS_SPACES_EQUALS), + FieldValue(x) => Self::escape_field_value(x, true), + TagValue(x) => Self::escape_tag_value(x), + } + } + + fn escape_field_value(v: &Type, use_v2: bool) -> String { use Type::*; match v { Boolean(v) => { @@ -43,7 +53,13 @@ impl LineProtoTerm<'_> { .to_string(), Float(v) => v.to_string(), SignedInteger(v) => format!("{}i", v), - UnsignedInteger(v) => format!("{}u", v), + UnsignedInteger(v) => { + if use_v2 { + format!("{}u", v) + } else { + format!("{}i", v) + } + } Text(v) => format!(r#""{}""#, Self::escape_any(v, "ES_SLASHES)), } } @@ -112,6 +128,12 @@ mod test { assert_eq!(FieldValue(&Type::SignedInteger(0)).escape(), r#"0i"#); assert_eq!(FieldValue(&Type::SignedInteger(83)).escape(), r#"83i"#); + assert_eq!(FieldValue(&Type::UnsignedInteger(0)).escape(), r#"0i"#); + assert_eq!(FieldValue(&Type::UnsignedInteger(83)).escape(), r#"83i"#); + + assert_eq!(FieldValue(&Type::UnsignedInteger(0)).escape_v2(), r#"0u"#); + assert_eq!(FieldValue(&Type::UnsignedInteger(83)).escape_v2(), r#"83u"#); + assert_eq!(FieldValue(&Type::Text("".into())).escape(), r#""""#); assert_eq!(FieldValue(&Type::Text("0".into())).escape(), r#""0""#); assert_eq!(FieldValue(&Type::Text("\"".into())).escape(), r#""\"""#); diff --git a/influxdb/src/query/mod.rs b/influxdb/src/query/mod.rs index d12275a..0546b2b 100644 --- a/influxdb/src/query/mod.rs +++ b/influxdb/src/query/mod.rs @@ -112,12 +112,36 @@ pub trait Query { /// ``` fn build(&self) -> Result; + /// Like [build] but with additional support for unsigned integers in the line protocol. + /// Please note, this crate can only interact with InfluxDB 2.0 in compatibility mode + /// and does not natively support InfluxDB 2.0. + /// + /// # Examples + /// + /// ```rust + /// use influxdb::{Query, Timestamp}; + /// use influxdb::InfluxDbWriteable; + /// + /// let use_v2 = true; + /// + /// let invalid_query = Timestamp::Nanoseconds(0).into_query("measurement").build_with_opts(use_v2); + /// assert!(invalid_query.is_err()); + /// + /// let valid_query = Timestamp::Nanoseconds(0).into_query("measurement").add_field("myfield1", 11).build_with_opts(use_v2); + /// assert!(valid_query.is_ok()); + /// ``` + fn build_with_opts(&self, use_v2: bool) -> Result; + fn get_type(&self) -> QueryType; } impl Query for &Q { fn build(&self) -> Result { - Q::build(self) + Q::build_with_opts(self, false) + } + + fn build_with_opts(&self, use_v2: bool) -> Result { + Q::build_with_opts(self, use_v2) } fn get_type(&self) -> QueryType { @@ -130,6 +154,10 @@ impl Query for Box { Q::build(self) } + fn build_with_opts(&self, use_v2: bool) -> Result { + Q::build_with_opts(self, use_v2) + } + fn get_type(&self) -> QueryType { Q::get_type(self) } diff --git a/influxdb/src/query/read_query.rs b/influxdb/src/query/read_query.rs index fdeec3c..a102146 100644 --- a/influxdb/src/query/read_query.rs +++ b/influxdb/src/query/read_query.rs @@ -38,6 +38,10 @@ impl Query for ReadQuery { Ok(ValidQuery(self.queries.join(";"))) } + fn build_with_opts(&self, _use_v2: bool) -> Result { + Ok(ValidQuery(self.queries.join(";"))) + } + fn get_type(&self) -> QueryType { QueryType::ReadQuery } diff --git a/influxdb/src/query/write_query.rs b/influxdb/src/query/write_query.rs index d9e8b08..7014731 100644 --- a/influxdb/src/query/write_query.rs +++ b/influxdb/src/query/write_query.rs @@ -163,6 +163,10 @@ where impl Query for WriteQuery { fn build(&self) -> Result { + self.build_with_opts(false) + } + + fn build_with_opts(&self, use_v2: bool) -> Result { if self.fields.is_empty() { return Err(Error::InvalidQueryError { error: "fields cannot be empty".to_string(), @@ -173,10 +177,20 @@ impl Query for WriteQuery { .tags .iter() .map(|(tag, value)| { + let escaped_tag_key = if use_v2 { + LineProtoTerm::TagKey(tag).escape_v2() + } else { + LineProtoTerm::TagKey(tag).escape() + }; + let escaped_tag_value = if use_v2 { + LineProtoTerm::TagValue(value).escape_v2() + } else { + LineProtoTerm::TagValue(value).escape() + }; format!( "{tag}={value}", - tag = LineProtoTerm::TagKey(tag).escape(), - value = LineProtoTerm::TagValue(value).escape(), + tag = escaped_tag_key, + value = escaped_tag_value, ) }) .collect::>() @@ -189,18 +203,34 @@ impl Query for WriteQuery { .fields .iter() .map(|(field, value)| { + let escaped_field_key = if use_v2 { + LineProtoTerm::FieldKey(field).escape_v2() + } else { + LineProtoTerm::FieldKey(field).escape() + }; + let escaped_field_value = if use_v2 { + LineProtoTerm::FieldValue(value).escape_v2() + } else { + LineProtoTerm::FieldValue(value).escape() + }; format!( "{field}={value}", - field = LineProtoTerm::FieldKey(field).escape(), - value = LineProtoTerm::FieldValue(value).escape(), + field = escaped_field_key, + value = escaped_field_value, ) }) .collect::>() .join(","); + let escaped_measurement = if use_v2 { + LineProtoTerm::Measurement(&self.measurement).escape_v2() + } else { + LineProtoTerm::Measurement(&self.measurement).escape() + }; + Ok(ValidQuery(format!( "{measurement}{tags} {fields} {time}", - measurement = LineProtoTerm::Measurement(&self.measurement).escape(), + measurement = escaped_measurement, tags = tags, fields = fields, time = self.timestamp @@ -224,6 +254,17 @@ impl Query for Vec { Ok(ValidQuery(qlines.join("\n"))) } + fn build_with_opts(&self, use_v2: bool) -> Result { + let mut qlines = Vec::new(); + + for q in self { + let valid_query = q.build_with_opts(use_v2)?; + qlines.push(valid_query.0); + } + + Ok(ValidQuery(qlines.join("\n"))) + } + fn get_type(&self) -> QueryType { QueryType::WriteQuery( self.get(0) @@ -267,6 +308,22 @@ mod tests { .add_field("temperature_unsigned", 82u64) .build(); + assert!(query.is_ok(), "Query was empty"); + assert_eq!( + query.unwrap(), + "weather temperature=82i,wind_strength=3.7,temperature_unsigned=82i 11" + ); + } + + #[test] + fn test_write_builder_multiple_fields_with_v2() { + let query = Timestamp::Hours(11) + .into_query("weather".to_string()) + .add_field("temperature", 82) + .add_field("wind_strength", 3.7) + .add_field("temperature_unsigned", 82u64) + .build_with_opts(true); + assert!(query.is_ok(), "Query was empty"); assert_eq!( query.unwrap(), @@ -282,6 +339,18 @@ mod tests { .add_tag("wind_strength", >::None) .build(); + assert!(query.is_ok(), "Query was empty"); + assert_eq!(query.unwrap(), "weather temperature=82i 11"); + } + + #[test] + fn test_write_builder_optional_fields_with_v2() { + let query = Timestamp::Hours(11) + .into_query("weather".to_string()) + .add_field("temperature", 82u64) + .add_tag("wind_strength", >::None) + .build_with_opts(true); + assert!(query.is_ok(), "Query was empty"); assert_eq!(query.unwrap(), "weather temperature=82u 11"); }