Skip to content

Batch write support #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ jobs:
runs-on: ubuntu-latest
services:
influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 8086:8086
authed_influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 9086:8086
env:
Expand All @@ -68,17 +68,17 @@ jobs:
- uses: actions/checkout@v1
- uses: dtolnay/rust-toolchain@stable
- run: cargo test --package influxdb --package influxdb_derive --all-features --no-fail-fast

coverage:
name: Code Coverage (stable/ubuntu-20.04)
runs-on: ubuntu-20.04
services:
influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 8086:8086
authed_influxdb:
image: influxdb
image: influxdb:1.8
ports:
- 9086:8086
env:
Expand All @@ -87,19 +87,19 @@ jobs:
INFLUXDB_ADMIN_PASSWORD: password
INFLUXDB_USER: nopriv_user
INFLUXDB_USER_PASSWORD: password

steps:
- uses: actions/checkout@v2
- uses: dtolnay/rust-toolchain@stable

- name: Get Rust Version
id: rust-version
run: echo "::set-output name=VERSION::$(cargo -V | head -n1 | awk '{print $2}')"

- name: Get Tarpaulin Version
id: tarpaulin-version
run: echo "::set-output name=VERSION::$(wget -qO- 'https://api.github.com/repos/xd009642/tarpaulin/releases/latest' | jq -r '.tag_name')"

- uses: actions/cache@v2
with:
path: |
Expand All @@ -108,12 +108,12 @@ jobs:
~/.cargo/registry
target
key: ${{ runner.os }}-cargo-${{ steps.rust-version.outputs.VERSION }}-tarpaulin-${{ steps.tarpaulin-version.outputs.VERSION }} }}

- name: Install Tarpaulin
run: |
ls -lh ~/.cargo/bin
test -e ~/.cargo/bin/cargo-tarpaulin || cargo install cargo-tarpaulin --version ${{ steps.tarpaulin-version.outputs.VERSION }}

- name: Run Tarpaulin coverage tests
run: |
cargo tarpaulin -v \
Expand All @@ -127,14 +127,14 @@ jobs:
env:
RUST_BACKTRACE: 1
RUST_LOG: info

- uses: actions/upload-artifact@v2
with:
name: tarpaulin-report
path: |
tarpaulin-report.json
tarpaulin-report.html

pages:
runs-on: ubuntu-20.04
needs:
Expand All @@ -144,19 +144,19 @@ jobs:
- uses: actions/checkout@v2
with:
ref: gh-pages

- uses: actions/download-artifact@v2
with:
name: tarpaulin-report

- run: |
coverage=$(jq '.files | { covered: map(.covered) | add, coverable: map(.coverable) | add } | .covered / .coverable * 10000 | round | . / 100' tarpaulin-report.json)
color=$([[ $coverage < 80 ]] && printf yellow || printf brightgreen)
wget -qO coverage.svg "https://img.shields.io/badge/coverage-$coverage%25-$color"

git add coverage.svg tarpaulin-report.html
git status

- uses: stefanzweifel/git-auto-commit-action@v4
with:
commit_message: "GitHub Pages for ${{ github.sha }}"
11 changes: 5 additions & 6 deletions influxdb/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use futures::prelude::*;
use surf::{self, Client as SurfClient, StatusCode};

use crate::query::QueryTypes;
use crate::query::QueryType;
use crate::Error;
use crate::Query;
use std::collections::HashMap;
Expand Down Expand Up @@ -159,14 +159,13 @@ impl Client {
pub async fn query<'q, Q>(&self, q: &'q Q) -> Result<String, Error>
where
Q: Query,
&'q Q: Into<QueryTypes<'q>>,
{
let query = q.build().map_err(|err| Error::InvalidQueryError {
error: err.to_string(),
})?;

let request_builder = match q.into() {
QueryTypes::Read(_) => {
let request_builder = match q.get_type() {
QueryType::ReadQuery => {
let read_query = query.get();
let url = &format!("{}/query", &self.url);
let mut parameters = self.parameters.as_ref().clone();
Expand All @@ -178,10 +177,10 @@ impl Client {
self.client.post(url).query(&parameters)
}
}
QueryTypes::Write(write_query) => {
QueryType::WriteQuery(precision) => {
let url = &format!("{}/write", &self.url);
let mut parameters = self.parameters.as_ref().clone();
parameters.insert("precision", write_query.get_precision());
parameters.insert("precision", precision);

self.client.post(url).body(query.get()).query(&parameters)
}
Expand Down
8 changes: 4 additions & 4 deletions influxdb/src/integrations/serde_integration/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ where
Name,
Columns,
Values,
};
}

struct SeriesVisitor<T> {
_inner_type: PhantomData<T>,
};
}

impl<'de, T> Visitor<'de> for SeriesVisitor<T>
where
Expand Down Expand Up @@ -115,12 +115,12 @@ where
Tags,
Columns,
Values,
};
}

struct SeriesVisitor<TAG, T> {
_tag_type: PhantomData<TAG>,
_value_type: PhantomData<T>,
};
}

impl<'de, TAG, T> Visitor<'de> for SeriesVisitor<TAG, T>
where
Expand Down
2 changes: 1 addition & 1 deletion influxdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub use error::Error;
pub use query::{
read_query::ReadQuery,
write_query::{Type, WriteQuery},
InfluxDbWriteable, Query, QueryType, QueryTypes, Timestamp, ValidQuery,
InfluxDbWriteable, Query, QueryType, Timestamp, ValidQuery,
};

#[cfg(feature = "use-serde")]
Expand Down
27 changes: 5 additions & 22 deletions influxdb/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ impl fmt::Display for Timestamp {
}
}

impl Into<DateTime<Utc>> for Timestamp {
fn into(self) -> DateTime<Utc> {
match self {
impl From<Timestamp> for DateTime<Utc> {
fn from(ts: Timestamp) -> DateTime<Utc> {
match ts {
Timestamp::Hours(h) => {
let nanos =
h * MINUTES_PER_HOUR * SECONDS_PER_MINUTE * MILLIS_PER_SECOND * NANOS_PER_MILLI;
Expand Down Expand Up @@ -93,24 +93,6 @@ where
}
}

/// Internal enum used to represent either type of query.
pub enum QueryTypes<'a> {
Read(&'a ReadQuery),
Write(&'a WriteQuery),
}

impl<'a> From<&'a ReadQuery> for QueryTypes<'a> {
fn from(query: &'a ReadQuery) -> Self {
Self::Read(query)
}
}

impl<'a> From<&'a WriteQuery> for QueryTypes<'a> {
fn from(query: &'a WriteQuery) -> Self {
Self::Write(query)
}
}

pub trait Query {
/// Builds valid InfluxSQL which can be run against the Database.
/// In case no fields have been specified, it will return an error,
Expand Down Expand Up @@ -192,7 +174,8 @@ impl PartialEq<&str> for ValidQuery {
#[derive(PartialEq, Debug)]
pub enum QueryType {
ReadQuery,
WriteQuery,
/// write query with precision
WriteQuery(String),
}

#[cfg(test)]
Expand Down
47 changes: 45 additions & 2 deletions influxdb/src/query/write_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,29 @@ impl Query for WriteQuery {
}

fn get_type(&self) -> QueryType {
QueryType::WriteQuery
QueryType::WriteQuery(self.get_precision())
}
}

impl Query for Vec<WriteQuery> {
fn build(&self) -> Result<ValidQuery, Error> {
let mut qlines = Vec::new();

for q in self {
let valid_query = q.build()?;
qlines.push(valid_query.0);
}

Ok(ValidQuery(qlines.join("\n")))
}

fn get_type(&self) -> QueryType {
QueryType::WriteQuery(
self.get(0)
.map(|q| q.get_precision())
// use "ms" as placeholder if query is empty
.unwrap_or_else(|| "ms".to_owned()),
)
}
}

Expand Down Expand Up @@ -296,7 +318,7 @@ mod tests {
.add_tag("location", "us-midwest")
.add_tag("season", "summer");

assert_eq!(query.get_type(), QueryType::WriteQuery);
assert_eq!(query.get_type(), QueryType::WriteQuery("h".to_owned()));
}

#[test]
Expand All @@ -318,4 +340,25 @@ mod tests {
r#"wea\,\ ther=,location=us-midwest,loc\,\ \="ation=us\,\ \"mid\=west temperature=82i,"temp\=era\,t\ ure"="too\"\\\\hot",float=82 11"#
);
}

#[test]
fn test_batch() {
let q0 = Timestamp::Hours(11)
.into_query("weather")
.add_field("temperature", 82)
.add_tag("location", "us-midwest");

let q1 = Timestamp::Hours(12)
.into_query("weather")
.add_field("temperature", 65)
.add_tag("location", "us-midwest");

let query = vec![q0, q1].build();

assert_eq!(
query.unwrap().get(),
r#"weather,location=us-midwest temperature=82i 11
weather,location=us-midwest temperature=65i 12"#
);
}
}
24 changes: 12 additions & 12 deletions influxdb/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ async fn test_connection_error() {
assert_result_err(&read_result);
match read_result {
Err(Error::ConnectionError { .. }) => {}
_ => panic!(format!(
_ => panic!(
"Should cause a ConnectionError: {}",
read_result.unwrap_err()
)),
),
}
}

Expand Down Expand Up @@ -139,21 +139,21 @@ async fn test_wrong_authed_write_and_read() {
assert_result_err(&write_result);
match write_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
write_result.unwrap_err()
)),
),
}

let read_query = Query::raw_read_query("SELECT * FROM weather");
let read_result = client.query(&read_query).await;
assert_result_err(&read_result);
match read_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
read_result.unwrap_err()
)),
),
}

let client = Client::new("http://localhost:9086", TEST_NAME)
Expand All @@ -163,10 +163,10 @@ async fn test_wrong_authed_write_and_read() {
assert_result_err(&read_result);
match read_result {
Err(Error::AuthenticationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthenticationError: {}",
read_result.unwrap_err()
)),
),
}
},
|| async move {
Expand Down Expand Up @@ -207,21 +207,21 @@ async fn test_non_authed_write_and_read() {
assert_result_err(&write_result);
match write_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
write_result.unwrap_err()
)),
),
}

let read_query = Query::raw_read_query("SELECT * FROM weather");
let read_result = non_authed_client.query(&read_query).await;
assert_result_err(&read_result);
match read_result {
Err(Error::AuthorizationError) => {}
_ => panic!(format!(
_ => panic!(
"Should be an AuthorizationError: {}",
read_result.unwrap_err()
)),
),
}
},
|| async move {
Expand Down