Skip to content

WIP: Multiquery, Timestamp Support, Test Refactor #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.0.3] - 2019-07-14

### Added

- Possibility to run multiple queries in one. See the Integration Tests in `tests/integration_tests.rs` for examples.
- Ability to specify Timestamp for write queries

### Changed

- You now have to borrow a query when passing it to the `query` method

## [0.0.2] - 2019-07-23

### Changed

- URLEncode Query before sending it to InfluxDB, which caused some empty returns (#5)
- Improved Test Coverage: There's now even more tests verifying correctness of the crate (#5)
- It's no longer necessary to supply a wildcard generic when working with serde*integration: `client.json_query::<Weather>(query)` instead of `client.json_query::<Weather, *>(query)`

[unreleased]: https://github.com/Empty2k12/influxdb-rust/compare/v0.0.3...HEAD
[0.0.3]: https://github.com/Empty2k12/influxdb-rust/compare/v0.0.2...v0.0.3
[0.0.2]: https://github.com/Empty2k12/influxdb-rust/releases/tag/v0.0.2
514 changes: 284 additions & 230 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "influxdb"
version = "0.0.2"
version = "0.0.3"
authors = ["Gero Gerke <[email protected]>"]
edition = "2018"
description = "InfluxDB Driver for Rust"
Expand Down
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM xd009642/tarpaulin

RUN wget https://dl.influxdata.com/influxdb/releases/influxdb_1.7.6_amd64.deb
RUN dpkg -i influxdb_1.7.6_amd64.deb
RUN INFLUXDB_HTTP_BIND_ADDRESS=9999 influxd > $HOME/influx.log 2>&1 &

WORKDIR /volume

CMD cargo build && cargo tarpaulin
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,27 @@ Pull requests are always welcome.

- Reading and Writing to InfluxDB
- Optional Serde Support for Deserialization
- Running multiple queries in one request (e.g. `SELECT * FROM weather_berlin; SELECT * FROM weather_london`)

## Planned Features

- Running multiple queries in one request (e.g. `SELECT * FROM weather_berlin; SELECT * FROM weather_london`)
- Read Query Builder instead of supplying raw queries
- Authentication against InfluxDB
- Methods for setting time and time precision in a query
- `#[derive(InfluxDbWritable)]`

## Quickstart

Add the following to your `Cargo.toml`

```toml
influxdb = "0.0.2"
influxdb = "0.0.3"
```

For an example with using Serde deserialization, please refer to [serde_integration](crate::integrations::serde_integration)

```rust
use influxdb::query::InfluxDbQuery;
use influxdb::query::{InfluxDbQuery, Timestamp};
use influxdb::client::InfluxDbClient;
use serde::Deserialize;
use tokio::runtime::current_thread::Runtime;

// Create a InfluxDbClient with URL `http://localhost:8086`
Expand All @@ -68,7 +67,7 @@ let client = InfluxDbClient::new("http://localhost:8086", "test");
// Let's write something to InfluxDB. First we're creating a
// InfluxDbWriteQuery to write some data.
// This creates a query which writes a new measurement into a series called `weather`
let write_query = InfluxDbQuery::write_query("weather")
let write_query = InfluxDbQuery::write_query(Timestamp::NOW, "weather")
.add_field("temperature", 82);

// Since this library is async by default, we're going to need a Runtime,
Expand All @@ -78,14 +77,14 @@ let mut rt = Runtime::new().expect("Unable to create a runtime");

// To actually submit the data to InfluxDB, the `block_on` method can be used to
// halt execution of our program until it has been completed.
let write_result = rt.block_on(client.query(write_query));
let write_result = rt.block_on(client.query(&write_query));
assert!(write_result.is_ok(), "Write result was not okay");

// Reading data is as simple as writing. First we need to create a query
let read_query = InfluxDbQuery::raw_read_query("SELECT _ FROM weather");

// Again, we're blocking until the request is done
let read_result = rt.block_on(client.query(read_query));
let read_result = rt.block_on(client.query(&read_query));

assert!(read_result.is_ok(), "Read result was not ok");

Expand All @@ -101,4 +100,4 @@ in the repository.

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

@ 2019 Gero Gerke, All rights reserved.
@ 2019 Gero Gerke, All rights reserved.
60 changes: 33 additions & 27 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ use reqwest::r#async::{Client, Decoder};
use std::mem;

use crate::error::InfluxDbError;
use crate::query::{InfluxDbQuery, QueryType};
use crate::query::read_query::InfluxDbReadQuery;
use crate::query::write_query::InfluxDbWriteQuery;
use crate::query::InfluxDbQuery;

use url::form_urlencoded;

use std::any::Any;

/// Internal Representation of a Client
pub struct InfluxDbClient {
url: String,
Expand Down Expand Up @@ -108,21 +112,20 @@ impl InfluxDbClient {
///
/// ```rust
/// use influxdb::client::InfluxDbClient;
/// use influxdb::query::InfluxDbQuery;
/// use influxdb::query::{InfluxDbQuery, Timestamp};
///
/// let client = InfluxDbClient::new("http://localhost:8086", "test");
/// let _future = client.query(
/// InfluxDbQuery::write_query("weather")
/// &InfluxDbQuery::write_query(Timestamp::NOW, "weather")
/// .add_field("temperature", 82)
/// );
/// ```
pub fn query<Q>(&self, q: Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
pub fn query<Q>(&self, q: &Q) -> Box<dyn Future<Item = String, Error = InfluxDbError>>
where
Q: InfluxDbQuery,
Q: Any + InfluxDbQuery,
{
use futures::future;

let q_type = q.get_type();
let query = match q.build() {
Err(err) => {
let error = InfluxDbError::InvalidQueryError {
Expand All @@ -133,35 +136,38 @@ impl InfluxDbClient {
Ok(query) => query,
};

let client = match q_type {
QueryType::ReadQuery => {
let read_query = query.get();
let encoded: String = form_urlencoded::Serializer::new(String::new())
.append_pair("db", self.database_name())
.append_pair("q", &read_query)
.finish();
let http_query_string = format!(
"{url}/query?{encoded}",
url = self.database_url(),
encoded = encoded
);

if read_query.contains("SELECT") || read_query.contains("SHOW") {
Client::new().get(http_query_string.as_str())
} else {
Client::new().post(http_query_string.as_str())
}
let any_value = q as &dyn Any;

let client = if let Some(_) = any_value.downcast_ref::<InfluxDbReadQuery>() {
let read_query = query.get();
let encoded: String = form_urlencoded::Serializer::new(String::new())
.append_pair("db", self.database_name())
.append_pair("q", &read_query)
.finish();
let http_query_string = format!(
"{url}/query?{encoded}",
url = self.database_url(),
encoded = encoded
);
if read_query.contains("SELECT") || read_query.contains("SHOW") {
Client::new().get(http_query_string.as_str())
} else {
Client::new().post(http_query_string.as_str())
}
QueryType::WriteQuery => Client::new()
} else if let Some(write_query) = any_value.downcast_ref::<InfluxDbWriteQuery>() {
Client::new()
.post(
format!(
"{url}/write?db={db}",
"{url}/write?db={db}{precision_str}",
url = self.database_url(),
db = self.database_name(),
precision_str = write_query.get_precision_modifier()
)
.as_str(),
)
.body(query.get()),
.body(query.get())
} else {
unreachable!()
};

Box::new(
Expand Down
68 changes: 46 additions & 22 deletions src/integrations/serde_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
//! `name`, InfluxDB provides alongside query results.
//!
//! ```rust,no_run
//! use influxdb::query::InfluxDbQuery;
//! use futures::prelude::*;
//! use influxdb::client::InfluxDbClient;
//! use influxdb::query::InfluxDbQuery;
//! use serde::Deserialize;
//!
//! #[derive(Deserialize)]
//! struct WeatherWithoutCityName {
//! temperature: i32
//! temperature: i32,
//! }
//!
//! #[derive(Deserialize)]
Expand All @@ -24,16 +25,26 @@
//!
//! let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
//! let client = InfluxDbClient::new("http://localhost:8086", "test");
//! let query = InfluxDbQuery::raw_read_query("SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC");
//! let _result = rt.block_on(client.json_query::<WeatherWithoutCityName>(query))
//! let query = InfluxDbQuery::raw_read_query(
//! "SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC",
//! );
//! let _result = rt
//! .block_on(client.json_query(query))
//! .map(|mut db_result| db_result.deserialize_next::<WeatherWithoutCityName>())
//! .map(|it| {
//! it.map(|series_vec| {
//! series_vec
//! .series
//! .into_iter()
//! .map(|mut city_series| {
//! let city_name = city_series.name.split("_").collect::<Vec<&str>>().remove(2);
//! Weather { weather: city_series.values.remove(0), city_name: city_name.to_string() }
//! }).collect::<Vec<Weather>>()
//! let city_name =
//! city_series.name.split("_").collect::<Vec<&str>>().remove(2);
//! Weather {
//! weather: city_series.values.remove(0),
//! city_name: city_name.to_string(),
//! }
//! })
//! .collect::<Vec<Weather>>()
//! })
//! });
//! ```
Expand All @@ -56,6 +67,8 @@ use crate::query::InfluxDbQuery;

use url::form_urlencoded;

use futures::future::Either;

#[derive(Deserialize)]
#[doc(hidden)]
struct _DatabaseError {
Expand All @@ -64,14 +77,30 @@ struct _DatabaseError {

#[derive(Deserialize, Debug)]
#[doc(hidden)]
pub struct DatabaseQueryResult<T> {
pub results: Vec<InfluxDbReturn<T>>,
pub struct DatabaseQueryResult {
pub results: Vec<serde_json::Value>,
}

impl DatabaseQueryResult {
pub fn deserialize_next<T: 'static>(
&mut self,
) -> impl Future<Item = InfluxDbReturn<T>, Error = InfluxDbError>
where
T: DeserializeOwned,
{
match serde_json::from_value::<InfluxDbReturn<T>>(self.results.remove(0)) {
Ok(item) => futures::future::result(Ok(item)),
Err(err) => futures::future::err(InfluxDbError::DeserializationError {
error: format!("could not deserialize: {}", err),
}),
}
}
}

#[derive(Deserialize, Debug)]
#[doc(hidden)]
pub struct InfluxDbReturn<T> {
pub series: Option<Vec<InfluxDbSeries<T>>>,
pub series: Vec<InfluxDbSeries<T>>,
}

#[derive(Deserialize, Debug)]
Expand All @@ -82,13 +111,10 @@ pub struct InfluxDbSeries<T> {
}

impl InfluxDbClient {
pub fn json_query<T: 'static>(
pub fn json_query(
&self,
q: InfluxDbReadQuery,
) -> Box<dyn Future<Item = Option<Vec<InfluxDbSeries<T>>>, Error = InfluxDbError>>
where
T: DeserializeOwned,
{
) -> impl Future<Item = DatabaseQueryResult, Error = InfluxDbError> {
use futures::future;

let query = q.build().unwrap();
Expand All @@ -113,13 +139,11 @@ impl InfluxDbClient {
"Only SELECT and SHOW queries supported with JSON deserialization",
),
};
return Box::new(
future::err::<Option<Vec<InfluxDbSeries<T>>>, InfluxDbError>(error),
);
return Either::B(future::err::<DatabaseQueryResult, InfluxDbError>(error));
}
};

Box::new(
Either::A(
client
.send()
.and_then(|mut res| {
Expand All @@ -137,9 +161,9 @@ impl InfluxDbClient {
});
} else {
// Json has another structure, let's try actually parsing it to the type we're deserializing
let from_slice = serde_json::from_slice::<DatabaseQueryResult<T>>(&body);
let from_slice = serde_json::from_slice::<DatabaseQueryResult>(&body);

let mut deserialized = match from_slice {
let deserialized = match from_slice {
Ok(deserialized) => deserialized,
Err(err) => {
return futures::future::err(InfluxDbError::DeserializationError {
Expand All @@ -148,7 +172,7 @@ impl InfluxDbClient {
}
};

return futures::future::result(Ok(deserialized.results.remove(0).series));
return futures::future::result(Ok(deserialized));
}
}),
)
Expand Down
Loading