Skip to content

Fix Encoding/Escaping according to the InfluxDb Line-Protocol #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ futures = "0.3.4"
reqwest = { version = "0.10.1", features = ["json"] }
serde = { version = "1.0.104", features = ["derive"], optional = true }
serde_json = { version = "1.0.46", optional = true }
regex = "1.3.4"
lazy_static = "1.4.0"

# This is a temporary work around to fix a Failure-derive compilation error
# Should be removed when https://github.com/Empty2k12/influxdb-rust/issues/48 is being done
quote = "=1.0.2"

[features]
use-serde = ["serde", "serde_json"]
Expand Down
88 changes: 88 additions & 0 deletions src/query/line_proto_term.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/// InfluxDB Line Protocol escaping helper module.
/// https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
use crate::Type;
use lazy_static::lazy_static;
use regex::Regex;

lazy_static! {
pub static ref COMMAS_SPACES: Regex = Regex::new("[, ]").unwrap();
pub static ref COMMAS_SPACES_EQUALS: Regex = Regex::new("[, =]").unwrap();
pub static ref QUOTES_SLASHES: Regex = Regex::new(r#"["\\]"#).unwrap();
}

pub enum LineProtoTerm<'a> {
Measurement(&'a str), // escape commas, spaces
TagKey(&'a str), // escape commas, equals, spaces
TagValue(&'a str), // escape commas, equals, spaces
FieldKey(&'a str), // escape commas, equals, spaces
FieldValue(&'a Type), // escape quotes, backslashes + quote
}

impl LineProtoTerm<'_> {
pub fn escape(self) -> String {
use LineProtoTerm::*;
match self {
Measurement(x) => Self::escape_any(x, &*COMMAS_SPACES),
TagKey(x) | TagValue(x) | FieldKey(x) => Self::escape_any(x, &*COMMAS_SPACES_EQUALS),
FieldValue(x) => Self::escape_field_value(x),
}
}

fn escape_field_value(v: &Type) -> String {
use Type::*;
match v {
Boolean(v) => {
if *v {
"true"
} else {
"false"
}
}
.to_string(),
Float(v) => v.to_string(),
SignedInteger(v) => format!("{}i", v),
UnsignedInteger(v) => format!("{}i", v),
Text(v) => format!("\"{}\"", Self::escape_any(v, &*QUOTES_SLASHES)),
}
}

fn escape_any(s: &str, re: &Regex) -> String {
re.replace_all(s, r#"\$0"#).to_string()
}
}

#[cfg(test)]
mod test {
use crate::query::line_proto_term::LineProtoTerm::*;
use crate::Type;

#[test]
fn test() {
assert_eq!(Measurement(r#"wea", ther"#).escape(), r#"wea"\,\ ther"#);
assert_eq!(TagKey(r#"locat\ ,=ion"#).escape(), r#"locat\\ \,\=ion"#);

assert_eq!(FieldValue(&Type::Boolean(true)).escape(), r#"true"#);
assert_eq!(FieldValue(&Type::Boolean(false)).escape(), r#"false"#);

assert_eq!(FieldValue(&Type::Float(0.0)).escape(), r#"0"#);
assert_eq!(FieldValue(&Type::Float(-0.1)).escape(), r#"-0.1"#);

assert_eq!(FieldValue(&Type::SignedInteger(0)).escape(), r#"0i"#);
assert_eq!(FieldValue(&Type::SignedInteger(83)).escape(), r#"83i"#);

assert_eq!(FieldValue(&Type::Text("".into())).escape(), r#""""#);
assert_eq!(FieldValue(&Type::Text("0".into())).escape(), r#""0""#);
assert_eq!(FieldValue(&Type::Text("\"".into())).escape(), r#""\"""#);
assert_eq!(
FieldValue(&Type::Text(r#"locat"\ ,=ion"#.into())).escape(),
r#""locat\"\\ ,=ion""#
);
}

#[test]
fn test_empty_tag_value() {
// InfluxDB doesn't support empty tag values. But that's a job
// of a calling site to validate an entire write request.
assert_eq!(TagValue("").escape(), "");
}
}
1 change: 1 addition & 0 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::convert::TryInto;

#[cfg(feature = "chrono_timestamps")]
pub mod consts;
mod line_proto_term;
pub mod read_query;
pub mod write_query;
use std::fmt;
Expand Down
57 changes: 44 additions & 13 deletions src/query/write_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@
//!
//! Can only be instantiated by using Query::write_query

use crate::query::line_proto_term::LineProtoTerm;
use crate::query::{QueryType, ValidQuery};
use crate::{Error, Query, Timestamp};
use std::fmt::{Display, Formatter};

// todo: batch write queries

pub trait WriteField {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>);
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>);
}

impl<T: Into<Type>> WriteField for T {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) {
let val: Type = self.into();
fields.push((tag, val.to_string()));
fields.push((tag, val));
}
}

impl<T: Into<Type>> WriteField for Option<T> {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, String)>) {
fn add_to_fields(self, tag: String, fields: &mut Vec<(String, Type)>) {
if let Some(val) = self {
val.add_to_fields(tag, fields);
}
Expand All @@ -29,7 +30,7 @@ impl<T: Into<Type>> WriteField for Option<T> {

/// Internal Representation of a Write query that has not yet been built
pub struct WriteQuery {
fields: Vec<(String, String)>,
fields: Vec<(String, Type)>,
tags: Vec<(String, String)>,
measurement: String,
timestamp: Timestamp,
Expand Down Expand Up @@ -121,7 +122,7 @@ impl Display for Type {
Float(x) => write!(f, "{}", x),
SignedInteger(x) => write!(f, "{}", x),
UnsignedInteger(x) => write!(f, "{}", x),
Text(text) => write!(f, "\"{text}\"", text = text),
Text(text) => write!(f, "{text}", text = text),
}
}
}
Expand Down Expand Up @@ -159,22 +160,35 @@ impl Query for WriteQuery {
let mut tags = self
.tags
.iter()
.map(|(tag, value)| format!("{tag}={value}", tag = tag, value = value))
.map(|(tag, value)| {
format!(
"{tag}={value}",
tag = LineProtoTerm::TagKey(tag).escape(),
value = LineProtoTerm::TagValue(value).escape(),
)
})
.collect::<Vec<String>>()
.join(",");

if !tags.is_empty() {
tags.insert_str(0, ",");
}
let fields = self
.fields
.iter()
.map(|(field, value)| format!("{field}={value}", field = field, value = value))
.map(|(field, value)| {
format!(
"{field}={value}",
field = LineProtoTerm::FieldKey(field).escape(),
value = LineProtoTerm::FieldValue(value).escape(),
)
})
.collect::<Vec<String>>()
.join(",");

Ok(ValidQuery(format!(
"{measurement}{tags} {fields}{time}",
measurement = self.measurement,
measurement = LineProtoTerm::Measurement(&self.measurement).escape(),
tags = tags,
fields = fields,
time = match self.timestamp {
Expand Down Expand Up @@ -207,7 +221,7 @@ mod tests {
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(query.unwrap(), "weather temperature=82 11");
assert_eq!(query.unwrap(), "weather temperature=82i 11");
}

#[test]
Expand All @@ -220,7 +234,7 @@ mod tests {
assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap(),
"weather temperature=82,wind_strength=3.7 11"
"weather temperature=82i,wind_strength=3.7 11"
);
}

Expand All @@ -232,7 +246,7 @@ mod tests {
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(query.unwrap(), "weather temperature=82 11");
assert_eq!(query.unwrap(), "weather temperature=82i 11");
}

#[test]
Expand All @@ -255,7 +269,7 @@ mod tests {
assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap(),
"weather,location=\"us-midwest\",season=\"summer\" temperature=82 11"
"weather,location=us-midwest,season=summer temperature=82i 11"
);
}

Expand All @@ -270,4 +284,21 @@ mod tests {

assert_eq!(query.get_type(), QueryType::WriteQuery);
}

#[test]
fn test_escaping() {
let query = Query::write_query(Timestamp::Hours(11), "wea, ther=")
.add_field("temperature", 82)
.add_field("\"temp=era,t ure\"", r#"too"\\hot"#)
.add_field("float", 82.0)
.add_tag("location", "us-midwest")
.add_tag("loc, =\"ation", "us, \"mid=west\"")
.build();

assert!(query.is_ok(), "Query was empty");
assert_eq!(
query.unwrap().get(),
r#"wea\,\ ther=,location=us-midwest,loc\,\ \="ation=us\,\ "mid\=west" temperature=82i,"temp\=era\,t\ ure"="too\"\\\\hot",float=82 11"#
);
}
}