Skip to content

refactor(otlp_metric): make otlp metric compatible with promql #6543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::otlp::OtlpMetricOptions;
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::query_handler::{OpenTelemetryProtocolHandler, PipelineHandlerRef};
Expand All @@ -37,6 +40,7 @@ impl OpenTelemetryProtocolHandler for Instance {
async fn metrics(
&self,
request: ExportMetricsServiceRequest,
metric_options: OtlpMetricOptions,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.plugins
Expand All @@ -50,7 +54,8 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;

let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
let (requests, rows) =
otlp::metrics::to_grpc_insert_requests(request, metric_options.legacy_mode)?;
OTLP_METRICS_ROWS.inc_by(rows as u64);

let _guard = if let Some(limiter) = &self.limiter {
Expand All @@ -63,10 +68,21 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};

self.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
if metric_options.legacy_mode || !metric_options.with_metric_engine {
self.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
} else {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
self.handle_metric_row_inserts(requests, ctx, physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}

#[tracing::instrument(skip_all)]
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ where
}

if opts.otlp.enable {
builder = builder.with_otlp_handler(self.instance.clone());
builder = builder
.with_otlp_handler(self.instance.clone(), opts.prom_store.with_metric_engine);
}

if opts.jaeger.enable {
Expand Down Expand Up @@ -158,7 +159,7 @@ where
let grpc_server = builder
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otel_arrow_handler(OtelArrowServiceHandler(self.instance.clone()))
.otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone()))
.flight_handler(Arc::new(greptime_request_handler))
.frontend_grpc_handler(frontend_grpc_handler)
.build();
Expand Down
19 changes: 15 additions & 4 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::error::{
ToJsonSnafu,
};
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::otlp::OtlpState;
use crate::http::prom_store::PromStoreState;
use crate::http::prometheus::{
build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
Expand Down Expand Up @@ -631,11 +632,15 @@ impl HttpServerBuilder {
}
}

pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self {
pub fn with_otlp_handler(
self,
handler: OpenTelemetryProtocolHandlerRef,
with_metric_engine: bool,
) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/otlp"),
HttpServer::route_otlp(handler),
HttpServer::route_otlp(handler, with_metric_engine),
),
..self
}
Expand Down Expand Up @@ -1100,7 +1105,10 @@ impl HttpServer {
.with_state(opentsdb_handler)
}

fn route_otlp<S>(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
fn route_otlp<S>(
otlp_handler: OpenTelemetryProtocolHandlerRef,
with_metric_engine: bool,
) -> Router<S> {
Router::new()
.route("/v1/metrics", routing::post(otlp::metrics))
.route("/v1/traces", routing::post(otlp::traces))
Expand All @@ -1109,7 +1117,10 @@ impl HttpServer {
ServiceBuilder::new()
.layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)),
)
.with_state(otlp_handler)
.with_state(OtlpState {
with_metric_engine,
handler: otlp_handler,
})
}

fn route_config<S>(state: GreptimeOptionsConfigState) -> Router<S> {
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod constants {
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";
pub const GREPTIME_METRICS_LEGACY_MODE_HEADER_NAME: &str = "x-greptime-metrics-legacy-mode";

/// The header key that contains the pipeline params.
pub const GREPTIME_PIPELINE_PARAMS_HEADER: &str = "x-greptime-pipeline-params";
Expand Down
45 changes: 40 additions & 5 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use axum::Extension;
use bytes::Bytes;
use common_catalog::consts::{TRACE_TABLE_NAME, TRACE_TABLE_NAME_SESSION_KEY};
use common_telemetry::tracing;
use http::HeaderMap;
use opentelemetry_proto::tonic::collector::logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
};
Expand All @@ -37,16 +38,31 @@ use snafu::prelude::*;

use crate::error::{self, PipelineSnafu, Result};
use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName};
use crate::http::header::constants::GREPTIME_METRICS_LEGACY_MODE_HEADER_NAME;
use crate::http::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
use crate::query_handler::{OpenTelemetryProtocolHandlerRef, PipelineHandler};

#[derive(Clone)]
pub struct OtlpState {
pub with_metric_engine: bool,
pub handler: OpenTelemetryProtocolHandlerRef,
}

#[derive(Clone)]
pub struct OtlpMetricOptions {
// use the legacy otlp metrics processing
// default to true to keep the backward compatibility
pub legacy_mode: bool,
pub with_metric_engine: bool,
}

#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "metrics"))]
pub async fn metrics(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
State(state): State<OtlpState>,
Extension(mut query_ctx): Extension<QueryContext>,

headers: HeaderMap,
bytes: Bytes,
) -> Result<OtlpResponse<ExportMetricsServiceResponse>> {
let db = query_ctx.get_db_string();
Expand All @@ -58,8 +74,23 @@ pub async fn metrics(
let request =
ExportMetricsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;

let OtlpState {
with_metric_engine,
handler,
} = state;

let legacy_mode = headers
.get(GREPTIME_METRICS_LEGACY_MODE_HEADER_NAME)
.map(|v| v.to_str().unwrap_or("true") == "true")
.unwrap_or(true);

let metric_options = OtlpMetricOptions {
legacy_mode,
with_metric_engine,
};

handler
.metrics(request, query_ctx)
.metrics(request, metric_options, query_ctx)
.await
.map(|o| OtlpResponse {
resp_body: ExportMetricsServiceResponse {
Expand All @@ -72,7 +103,7 @@ pub async fn metrics(
#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
State(state): State<OtlpState>,
TraceTableName(table_name): TraceTableName,
pipeline_info: PipelineInfo,
Extension(mut query_ctx): Extension<QueryContext>,
Expand Down Expand Up @@ -100,6 +131,8 @@ pub async fn traces(

let pipeline_params = pipeline_info.pipeline_params;

let OtlpState { handler, .. } = state;

// here we use nightly feature `trait_upcasting` to convert handler to
// pipeline_handler
let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
Expand All @@ -125,7 +158,7 @@ pub async fn traces(
#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))]
pub async fn logs(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
State(state): State<OtlpState>,
Extension(mut query_ctx): Extension<QueryContext>,
pipeline_info: PipelineInfo,
LogTableName(tablename): LogTableName,
Expand All @@ -149,6 +182,8 @@ pub async fn logs(
.context(PipelineSnafu)?;
let pipeline_params = pipeline_info.pipeline_params;

let OtlpState { handler, .. } = state;

// here we use nightly feature `trait_upcasting` to convert handler to
// pipeline_handler
let pipeline_handler: Arc<dyn PipelineHandler + Send + Sync> = handler.clone();
Expand Down
8 changes: 7 additions & 1 deletion src/servers/src/otel_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tonic::service::Interceptor;
use tonic::{Request, Response, Status, Streaming};

use crate::error;
use crate::http::otlp::OtlpMetricOptions;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

pub struct OtelArrowServiceHandler<T>(pub T);
Expand Down Expand Up @@ -85,7 +86,12 @@ impl ArrowMetricsService for OtelArrowServiceHandler<OpenTelemetryProtocolHandle
return;
}
};
if let Err(e) = handler.metrics(request, query_context.clone()).await {
// use metric engine by default
let opts = OtlpMetricOptions {
legacy_mode: false,
with_metric_engine: true,
};
if let Err(e) = handler.metrics(request, opts, query_context.clone()).await {
let _ = sender
.send(Err(Status::new(
status_to_tonic_code(e.status_code()),
Expand Down
Loading
Loading