Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::trace::TraceLayer;

use self::authorize::HttpAuth;
use self::influxdb::influxdb_write;
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
use crate::auth::UserProviderRef;
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
Expand Down Expand Up @@ -88,6 +88,8 @@ pub(crate) fn query_context_from_db(
pub const HTTP_API_VERSION: &str = "v1";
pub const HTTP_API_PREFIX: &str = "/v1/";

pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
Comment thread
fengys1996 marked this conversation as resolved.

pub struct HttpServer {
sql_handler: ServerSqlQueryHandlerRef,
options: HttpOptions,
Expand Down Expand Up @@ -492,6 +494,8 @@ impl HttpServer {
fn route_influxdb<S>(&self, influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/write", routing::post(influxdb_write))
.route("/ping", routing::get(influxdb_ping))
.route("/health", routing::get(influxdb_health))
.with_state(influxdb_handler)
}

Expand Down
43 changes: 41 additions & 2 deletions src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,6 +11,8 @@ use std::collections::HashMap;
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::marker::PhantomData;

use axum::http::{self, Request, StatusCode};
Expand All @@ -23,6 +24,7 @@ use session::context::UserInfo;
use snafu::{OptionExt, ResultExt};
use tower_http::auth::AsyncAuthorizeRequest;

use super::PUBLIC_APIS;
use crate::auth::Error::IllegalParam;
use crate::auth::{Identity, IllegalParamSnafu, InternalStateSnafu, UserProviderRef};
use crate::error::{self, Result};
Expand Down Expand Up @@ -63,7 +65,8 @@ where
fn authorize(&mut self, mut request: Request<B>) -> Self::Future {
let user_provider = self.user_provider.clone();
Box::pin(async move {
let need_auth = request.uri().path().starts_with(HTTP_API_PREFIX);
let need_auth = need_auth(&request);

let user_provider = if let Some(user_provider) = user_provider.filter(|_| need_auth) {
user_provider
} else {
Expand Down Expand Up @@ -209,10 +212,46 @@ fn decode_basic(credential: Credential) -> Result<(Username, Password)> {
error::InvalidAuthorizationHeaderSnafu {}.fail()
}

fn need_auth<B>(req: &Request<B>) -> bool {
let path = req.uri().path();

for api in PUBLIC_APIS {
if path.starts_with(api) {
return false;
}
}

path.starts_with(HTTP_API_PREFIX)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_need_auth() {
let req = Request::builder()
.uri("http://127.0.0.1/v1/influxdb/ping")
.body(())
.unwrap();

assert!(!need_auth(&req));

let req = Request::builder()
.uri("http://127.0.0.1/v1/influxdb/health")
.body(())
.unwrap();

assert!(!need_auth(&req));

let req = Request::builder()
.uri("http://127.0.0.1/v1/influxdb/write")
.body(())
.unwrap();

assert!(need_auth(&req));
}

#[test]
fn test_decode_basic() {
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="
Expand Down
15 changes: 14 additions & 1 deletion src/servers/src/http/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::writer::Precision;
use session::context::QueryContext;
Expand All @@ -25,12 +26,24 @@ use crate::error::{Result, TimePrecisionSnafu};
use crate::influxdb::InfluxdbRequest;
use crate::query_handler::InfluxdbLineProtocolHandlerRef;

// https://docs.influxdata.com/influxdb/v1.8/tools/api/#ping-http-endpoint
#[axum_macros::debug_handler]
pub async fn influxdb_ping() -> Result<impl IntoResponse> {
Ok(StatusCode::NO_CONTENT)
}

// https://docs.influxdata.com/influxdb/v1.8/tools/api/#health-http-endpoint
#[axum_macros::debug_handler]
pub async fn influxdb_health() -> Result<impl IntoResponse> {
Ok(StatusCode::OK)
}

#[axum_macros::debug_handler]
pub async fn influxdb_write(
State(handler): State<InfluxdbLineProtocolHandlerRef>,
Query(mut params): Query<HashMap<String, String>>,
lines: String,
) -> Result<(StatusCode, ())> {
) -> Result<impl IntoResponse> {
let db = params
.remove("db")
.unwrap_or_else(|| DEFAULT_SCHEMA_NAME.to_string());
Expand Down
6 changes: 6 additions & 0 deletions src/servers/tests/http/influxdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ async fn test_influxdb_write() {
let app = make_test_app(tx.clone(), None);
let client = TestClient::new(app);

let result = client.get("/v1/influxdb/health").send().await;
assert_eq!(result.status(), 200);

let result = client.get("/v1/influxdb/ping").send().await;
assert_eq!(result.status(), 204);

// right request
let result = client
.post("/v1/influxdb/write?db=public")
Expand Down