Skip to content

Commit 1950876

Browse files
authored
Batch write support (#87)
* (feat) add support for batching writes * (refactor) hold precision in WriteQuery type * (test) fix test for new query type enum * (test) add test for batch query * (chore) stick to influxdb 1.8 for integration test * (fix) fix lint warnings
1 parent 7931c89 commit 1950876

File tree

7 files changed

+89
-64
lines changed

7 files changed

+89
-64
lines changed

.github/workflows/rust.yml

+17-17
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ jobs:
5050
runs-on: ubuntu-latest
5151
services:
5252
influxdb:
53-
image: influxdb
53+
image: influxdb:1.8
5454
ports:
5555
- 8086:8086
5656
authed_influxdb:
57-
image: influxdb
57+
image: influxdb:1.8
5858
ports:
5959
- 9086:8086
6060
env:
@@ -68,17 +68,17 @@ jobs:
6868
- uses: actions/checkout@v1
6969
- uses: dtolnay/rust-toolchain@stable
7070
- run: cargo test --package influxdb --package influxdb_derive --all-features --no-fail-fast
71-
71+
7272
coverage:
7373
name: Code Coverage (stable/ubuntu-20.04)
7474
runs-on: ubuntu-20.04
7575
services:
7676
influxdb:
77-
image: influxdb
77+
image: influxdb:1.8
7878
ports:
7979
- 8086:8086
8080
authed_influxdb:
81-
image: influxdb
81+
image: influxdb:1.8
8282
ports:
8383
- 9086:8086
8484
env:
@@ -87,19 +87,19 @@ jobs:
8787
INFLUXDB_ADMIN_PASSWORD: password
8888
INFLUXDB_USER: nopriv_user
8989
INFLUXDB_USER_PASSWORD: password
90-
90+
9191
steps:
9292
- uses: actions/checkout@v2
9393
- uses: dtolnay/rust-toolchain@stable
94-
94+
9595
- name: Get Rust Version
9696
id: rust-version
9797
run: echo "::set-output name=VERSION::$(cargo -V | head -n1 | awk '{print $2}')"
98-
98+
9999
- name: Get Tarpaulin Version
100100
id: tarpaulin-version
101101
run: echo "::set-output name=VERSION::$(wget -qO- 'https://api.github.com/repos/xd009642/tarpaulin/releases/latest' | jq -r '.tag_name')"
102-
102+
103103
- uses: actions/cache@v2
104104
with:
105105
path: |
@@ -108,12 +108,12 @@ jobs:
108108
~/.cargo/registry
109109
target
110110
key: ${{ runner.os }}-cargo-${{ steps.rust-version.outputs.VERSION }}-tarpaulin-${{ steps.tarpaulin-version.outputs.VERSION }} }}
111-
111+
112112
- name: Install Tarpaulin
113113
run: |
114114
ls -lh ~/.cargo/bin
115115
test -e ~/.cargo/bin/cargo-tarpaulin || cargo install cargo-tarpaulin --version ${{ steps.tarpaulin-version.outputs.VERSION }}
116-
116+
117117
- name: Run Tarpaulin coverage tests
118118
run: |
119119
cargo tarpaulin -v \
@@ -127,14 +127,14 @@ jobs:
127127
env:
128128
RUST_BACKTRACE: 1
129129
RUST_LOG: info
130-
130+
131131
- uses: actions/upload-artifact@v2
132132
with:
133133
name: tarpaulin-report
134134
path: |
135135
tarpaulin-report.json
136136
tarpaulin-report.html
137-
137+
138138
pages:
139139
runs-on: ubuntu-20.04
140140
needs:
@@ -144,19 +144,19 @@ jobs:
144144
- uses: actions/checkout@v2
145145
with:
146146
ref: gh-pages
147-
147+
148148
- uses: actions/download-artifact@v2
149149
with:
150150
name: tarpaulin-report
151-
151+
152152
- run: |
153153
coverage=$(jq '.files | { covered: map(.covered) | add, coverable: map(.coverable) | add } | .covered / .coverable * 10000 | round | . / 100' tarpaulin-report.json)
154154
color=$([[ $coverage < 80 ]] && printf yellow || printf brightgreen)
155155
wget -qO coverage.svg "https://img.shields.io/badge/coverage-$coverage%25-$color"
156-
156+
157157
git add coverage.svg tarpaulin-report.html
158158
git status
159-
159+
160160
- uses: stefanzweifel/git-auto-commit-action@v4
161161
with:
162162
commit_message: "GitHub Pages for ${{ github.sha }}"

influxdb/src/client/mod.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use futures::prelude::*;
1919
use surf::{self, Client as SurfClient, StatusCode};
2020

21-
use crate::query::QueryTypes;
21+
use crate::query::QueryType;
2222
use crate::Error;
2323
use crate::Query;
2424
use std::collections::HashMap;
@@ -159,14 +159,13 @@ impl Client {
159159
pub async fn query<'q, Q>(&self, q: &'q Q) -> Result<String, Error>
160160
where
161161
Q: Query,
162-
&'q Q: Into<QueryTypes<'q>>,
163162
{
164163
let query = q.build().map_err(|err| Error::InvalidQueryError {
165164
error: err.to_string(),
166165
})?;
167166

168-
let request_builder = match q.into() {
169-
QueryTypes::Read(_) => {
167+
let request_builder = match q.get_type() {
168+
QueryType::ReadQuery => {
170169
let read_query = query.get();
171170
let url = &format!("{}/query", &self.url);
172171
let mut parameters = self.parameters.as_ref().clone();
@@ -178,10 +177,10 @@ impl Client {
178177
self.client.post(url).query(&parameters)
179178
}
180179
}
181-
QueryTypes::Write(write_query) => {
180+
QueryType::WriteQuery(precision) => {
182181
let url = &format!("{}/write", &self.url);
183182
let mut parameters = self.parameters.as_ref().clone();
184-
parameters.insert("precision", write_query.get_precision());
183+
parameters.insert("precision", precision);
185184

186185
self.client.post(url).body(query.get()).query(&parameters)
187186
}

influxdb/src/integrations/serde_integration/de.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ where
2222
Name,
2323
Columns,
2424
Values,
25-
};
25+
}
2626

2727
struct SeriesVisitor<T> {
2828
_inner_type: PhantomData<T>,
29-
};
29+
}
3030

3131
impl<'de, T> Visitor<'de> for SeriesVisitor<T>
3232
where
@@ -115,12 +115,12 @@ where
115115
Tags,
116116
Columns,
117117
Values,
118-
};
118+
}
119119

120120
struct SeriesVisitor<TAG, T> {
121121
_tag_type: PhantomData<TAG>,
122122
_value_type: PhantomData<T>,
123-
};
123+
}
124124

125125
impl<'de, TAG, T> Visitor<'de> for SeriesVisitor<TAG, T>
126126
where

influxdb/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub use error::Error;
8282
pub use query::{
8383
read_query::ReadQuery,
8484
write_query::{Type, WriteQuery},
85-
InfluxDbWriteable, Query, QueryType, QueryTypes, Timestamp, ValidQuery,
85+
InfluxDbWriteable, Query, QueryType, Timestamp, ValidQuery,
8686
};
8787

8888
#[cfg(feature = "use-serde")]

influxdb/src/query/mod.rs

+5-22
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ impl fmt::Display for Timestamp {
5555
}
5656
}
5757

58-
impl Into<DateTime<Utc>> for Timestamp {
59-
fn into(self) -> DateTime<Utc> {
60-
match self {
58+
impl From<Timestamp> for DateTime<Utc> {
59+
fn from(ts: Timestamp) -> DateTime<Utc> {
60+
match ts {
6161
Timestamp::Hours(h) => {
6262
let nanos =
6363
h * MINUTES_PER_HOUR * SECONDS_PER_MINUTE * MILLIS_PER_SECOND * NANOS_PER_MILLI;
@@ -93,24 +93,6 @@ where
9393
}
9494
}
9595

96-
/// Internal enum used to represent either type of query.
97-
pub enum QueryTypes<'a> {
98-
Read(&'a ReadQuery),
99-
Write(&'a WriteQuery),
100-
}
101-
102-
impl<'a> From<&'a ReadQuery> for QueryTypes<'a> {
103-
fn from(query: &'a ReadQuery) -> Self {
104-
Self::Read(query)
105-
}
106-
}
107-
108-
impl<'a> From<&'a WriteQuery> for QueryTypes<'a> {
109-
fn from(query: &'a WriteQuery) -> Self {
110-
Self::Write(query)
111-
}
112-
}
113-
11496
pub trait Query {
11597
/// Builds valid InfluxSQL which can be run against the Database.
11698
/// In case no fields have been specified, it will return an error,
@@ -192,7 +174,8 @@ impl PartialEq<&str> for ValidQuery {
192174
#[derive(PartialEq, Debug)]
193175
pub enum QueryType {
194176
ReadQuery,
195-
WriteQuery,
177+
/// write query with precision
178+
WriteQuery(String),
196179
}
197180

198181
#[cfg(test)]

influxdb/src/query/write_query.rs

+45-2
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,29 @@ impl Query for WriteQuery {
205205
}
206206

207207
fn get_type(&self) -> QueryType {
208-
QueryType::WriteQuery
208+
QueryType::WriteQuery(self.get_precision())
209+
}
210+
}
211+
212+
impl Query for Vec<WriteQuery> {
213+
fn build(&self) -> Result<ValidQuery, Error> {
214+
let mut qlines = Vec::new();
215+
216+
for q in self {
217+
let valid_query = q.build()?;
218+
qlines.push(valid_query.0);
219+
}
220+
221+
Ok(ValidQuery(qlines.join("\n")))
222+
}
223+
224+
fn get_type(&self) -> QueryType {
225+
QueryType::WriteQuery(
226+
self.get(0)
227+
.map(|q| q.get_precision())
228+
// use "ms" as placeholder if query is empty
229+
.unwrap_or_else(|| "ms".to_owned()),
230+
)
209231
}
210232
}
211233

@@ -296,7 +318,7 @@ mod tests {
296318
.add_tag("location", "us-midwest")
297319
.add_tag("season", "summer");
298320

299-
assert_eq!(query.get_type(), QueryType::WriteQuery);
321+
assert_eq!(query.get_type(), QueryType::WriteQuery("h".to_owned()));
300322
}
301323

302324
#[test]
@@ -318,4 +340,25 @@ mod tests {
318340
r#"wea\,\ ther=,location=us-midwest,loc\,\ \="ation=us\,\ \"mid\=west temperature=82i,"temp\=era\,t\ ure"="too\"\\\\hot",float=82 11"#
319341
);
320342
}
343+
344+
#[test]
345+
fn test_batch() {
346+
let q0 = Timestamp::Hours(11)
347+
.into_query("weather")
348+
.add_field("temperature", 82)
349+
.add_tag("location", "us-midwest");
350+
351+
let q1 = Timestamp::Hours(12)
352+
.into_query("weather")
353+
.add_field("temperature", 65)
354+
.add_tag("location", "us-midwest");
355+
356+
let query = vec![q0, q1].build();
357+
358+
assert_eq!(
359+
query.unwrap().get(),
360+
r#"weather,location=us-midwest temperature=82i 11
361+
weather,location=us-midwest temperature=65i 12"#
362+
);
363+
}
321364
}

influxdb/tests/integration_tests.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ async fn test_connection_error() {
5757
assert_result_err(&read_result);
5858
match read_result {
5959
Err(Error::ConnectionError { .. }) => {}
60-
_ => panic!(format!(
60+
_ => panic!(
6161
"Should cause a ConnectionError: {}",
6262
read_result.unwrap_err()
63-
)),
63+
),
6464
}
6565
}
6666

@@ -139,21 +139,21 @@ async fn test_wrong_authed_write_and_read() {
139139
assert_result_err(&write_result);
140140
match write_result {
141141
Err(Error::AuthorizationError) => {}
142-
_ => panic!(format!(
142+
_ => panic!(
143143
"Should be an AuthorizationError: {}",
144144
write_result.unwrap_err()
145-
)),
145+
),
146146
}
147147

148148
let read_query = Query::raw_read_query("SELECT * FROM weather");
149149
let read_result = client.query(&read_query).await;
150150
assert_result_err(&read_result);
151151
match read_result {
152152
Err(Error::AuthorizationError) => {}
153-
_ => panic!(format!(
153+
_ => panic!(
154154
"Should be an AuthorizationError: {}",
155155
read_result.unwrap_err()
156-
)),
156+
),
157157
}
158158

159159
let client = Client::new("http://localhost:9086", TEST_NAME)
@@ -163,10 +163,10 @@ async fn test_wrong_authed_write_and_read() {
163163
assert_result_err(&read_result);
164164
match read_result {
165165
Err(Error::AuthenticationError) => {}
166-
_ => panic!(format!(
166+
_ => panic!(
167167
"Should be an AuthenticationError: {}",
168168
read_result.unwrap_err()
169-
)),
169+
),
170170
}
171171
},
172172
|| async move {
@@ -207,21 +207,21 @@ async fn test_non_authed_write_and_read() {
207207
assert_result_err(&write_result);
208208
match write_result {
209209
Err(Error::AuthorizationError) => {}
210-
_ => panic!(format!(
210+
_ => panic!(
211211
"Should be an AuthorizationError: {}",
212212
write_result.unwrap_err()
213-
)),
213+
),
214214
}
215215

216216
let read_query = Query::raw_read_query("SELECT * FROM weather");
217217
let read_result = non_authed_client.query(&read_query).await;
218218
assert_result_err(&read_result);
219219
match read_result {
220220
Err(Error::AuthorizationError) => {}
221-
_ => panic!(format!(
221+
_ => panic!(
222222
"Should be an AuthorizationError: {}",
223223
read_result.unwrap_err()
224-
)),
224+
),
225225
}
226226
},
227227
|| async move {

0 commit comments

Comments
 (0)