-
Notifications
You must be signed in to change notification settings - Fork 3.9k
feat(gRPC): build gRPC client interface to initiate communication with recovery-decider service #8178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(gRPC): build gRPC client interface to initiate communication with recovery-decider service #8178
Changes from 147 commits
809da65
8f160b0
bd411f5
2e02b84
fd47398
4a82642
dd531f9
01d98a0
653cd5d
6cae8e1
787912a
a5c5d9a
da18869
f5d5294
826202a
d03c564
24dac12
a4262c7
71ec2bc
3b04911
44e5a18
75ef438
c53b310
0479693
4227dfb
69d5d24
6764550
2fe86f9
3142502
49f7a22
b4c7e71
a9899b5
e93d5a7
5aaaea6
da5a505
d75b207
c9f7aa6
9bebc57
86c9715
272c416
7378787
23ed82c
23e13fa
9e30a2a
eaf1e40
b154311
d2e3caa
1110062
7170f7a
79d8e64
964fd78
fda03ea
3e619b7
054bf7f
f916cb5
f0fe9d5
1fdab57
e1483a3
264766f
738037d
167dccd
720b422
86a63b6
0a516f6
4c0503d
48214ee
1b619b3
ae153fb
8d78311
0177ba4
fe2af2e
77b750d
32a74f9
4386261
a4aac2a
1bdedf7
98aeceb
f478efb
38378f9
d4b6025
b7fc94e
ad3beaa
c6ed016
49e68b5
35eae6a
9770ff6
60d745a
bd1913e
f89bf01
88671ca
576b4ab
5e7297d
4c50e55
71e0b4e
3d21400
ffd75fe
2b1d402
401c7b5
aa325bd
feb3f88
89405fb
5cead6b
d62ba13
5de0d97
1b768bf
0c01259
4edb072
f66a288
dfdb3e3
51deffb
c26bded
77bcbd5
3aed65b
dea1639
fa5f1ce
9ecd485
3eae00b
a9d90a1
060a33a
273acd1
0536d0e
abf8d50
8dd5919
baeb245
7e81472
920f211
20a864f
2bb5234
4b269f1
26fee44
ff53383
f791524
3818225
5da1854
01aaa59
553f094
da3304a
e810940
baee0b3
99daa29
560a72b
7bd8144
45435cc
59f5a77
bdddba9
4e19f8f
d973789
bbdc2a6
c9031ce
ee1735d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1148,12 +1148,18 @@ url = "http://localhost:8080" # Open Router URL | |
base_url = "http://localhost:8000" # Unified Connector Service Base URL | ||
connection_timeout = 10 # Connection Timeout Duration in Seconds | ||
|
||
[grpc_client.recovery_decider_client] # Revenue recovery client base url | ||
base_url = "http://127.0.0.1:8080" #Base URL | ||
|
||
[billing_connectors_invoice_sync] | ||
billing_connectors_which_requires_invoice_sync_call = "recurly" # List of billing connectors which has invoice sync api call | ||
|
||
[revenue_recovery] | ||
monitoring_threshold_in_seconds = 2592000 # 30*24*60*60 secs , threshold for monitoring the retry system | ||
retry_algorithm_type = "cascading" # type of retry algorithm | ||
monitoring_threshold_in_seconds = 60 # 30*24*60*60 secs , threshold for monitoring the retry system | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The number (60) and the comment don't match? |
||
retry_algorithm_type = "cascading" # type of retry algorithm | ||
|
||
[revenue_recovery.recovery_timestamp] # Timestamp configuration for Revenue Recovery | ||
initial_timestamp_in_hours = 1 # number of hours added to start time for Decider service of Revenue Recovery | ||
|
||
[clone_connector_allowlist] | ||
merchant_ids = "merchant_ids" # Comma-separated list of allowed merchant IDs | ||
|
SanchithHegde marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,10 @@ pub mod dynamic_routing; | |
/// gRPC based Heath Check Client interface implementation | ||
#[cfg(feature = "dynamic_routing")] | ||
pub mod health_check_client; | ||
/// gRPC based Recovery Trainer Client interface implementation | ||
#[cfg(feature = "revenue_recovery")] | ||
pub mod revenue_recovery; | ||
|
||
/// gRPC based Unified Connector Service Client interface implementation | ||
pub mod unified_connector_service; | ||
use std::{fmt::Debug, sync::Arc}; | ||
|
@@ -14,19 +18,26 @@ use common_utils::consts; | |
use dynamic_routing::{DynamicRoutingClientConfig, RoutingStrategy}; | ||
#[cfg(feature = "dynamic_routing")] | ||
use health_check_client::HealthCheckClient; | ||
#[cfg(feature = "dynamic_routing")] | ||
#[cfg(any(feature = "dynamic_routing", feature = "revenue_recovery"))] | ||
use hyper_util::client::legacy::connect::HttpConnector; | ||
#[cfg(feature = "dynamic_routing")] | ||
#[cfg(any(feature = "dynamic_routing", feature = "revenue_recovery"))] | ||
use router_env::logger; | ||
use serde; | ||
#[cfg(feature = "dynamic_routing")] | ||
#[cfg(any(feature = "dynamic_routing", feature = "revenue_recovery"))] | ||
use tonic::body::Body; | ||
|
||
#[cfg(feature = "revenue_recovery")] | ||
pub use self::revenue_recovery::{ | ||
recovery_decider_client::{ | ||
DeciderRequest, DeciderResponse, RecoveryDeciderClientConfig, | ||
RecoveryDeciderClientInterface, RecoveryDeciderError, RecoveryDeciderResult, | ||
}, | ||
GrpcRecoveryHeaders, | ||
}; | ||
use crate::grpc_client::unified_connector_service::{ | ||
UnifiedConnectorServiceClient, UnifiedConnectorServiceClientConfig, | ||
}; | ||
|
||
#[cfg(feature = "dynamic_routing")] | ||
#[cfg(any(feature = "dynamic_routing", feature = "revenue_recovery"))] | ||
/// Hyper based Client type for maintaining connection pool for all gRPC services | ||
pub type Client = hyper_util::client::legacy::Client<HttpConnector, Body>; | ||
|
||
|
@@ -39,15 +50,22 @@ pub struct GrpcClients { | |
/// Health Check client for all gRPC services | ||
#[cfg(feature = "dynamic_routing")] | ||
pub health_client: HealthCheckClient, | ||
/// Recovery Decider Client | ||
#[cfg(feature = "revenue_recovery")] | ||
pub recovery_decider_client: Option<Box<dyn RecoveryDeciderClientInterface>>, | ||
/// Unified Connector Service client | ||
pub unified_connector_service_client: Option<UnifiedConnectorServiceClient>, | ||
} | ||
|
||
/// Type that contains the configs required to construct a gRPC client with its respective services. | ||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize, Default)] | ||
pub struct GrpcClientSettings { | ||
#[cfg(feature = "dynamic_routing")] | ||
/// Configs for Dynamic Routing Client | ||
pub dynamic_routing_client: Option<DynamicRoutingClientConfig>, | ||
#[cfg(feature = "revenue_recovery")] | ||
/// Configs for Recovery Decider Client | ||
pub recovery_decider_client: Option<RecoveryDeciderClientConfig>, | ||
/// Configs for Unified Connector Service client | ||
pub unified_connector_service: Option<UnifiedConnectorServiceClientConfig>, | ||
} | ||
|
@@ -59,7 +77,7 @@ impl GrpcClientSettings { | |
/// This function will be called at service startup. | ||
#[allow(clippy::expect_used)] | ||
pub async fn get_grpc_client_interface(&self) -> Arc<GrpcClients> { | ||
#[cfg(feature = "dynamic_routing")] | ||
#[cfg(any(feature = "dynamic_routing", feature = "revenue_recovery"))] | ||
let client = | ||
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new()) | ||
.http2_only(true) | ||
|
@@ -75,18 +93,48 @@ impl GrpcClientSettings { | |
.flatten(); | ||
|
||
#[cfg(feature = "dynamic_routing")] | ||
let health_client = HealthCheckClient::build_connections(self, client) | ||
let health_client = HealthCheckClient::build_connections(self, client.clone()) | ||
.await | ||
.expect("Failed to build gRPC connections"); | ||
|
||
let unified_connector_service_client = | ||
UnifiedConnectorServiceClient::build_connections(self).await; | ||
|
||
#[cfg(feature = "revenue_recovery")] | ||
let recovery_decider_client = { | ||
match &self.recovery_decider_client { | ||
Some(config) => { | ||
// Validate the config first | ||
config | ||
.validate() | ||
.expect("Recovery Decider configuration validation failed"); | ||
|
||
// Create the client | ||
let client = config | ||
.get_recovery_decider_connection(client.clone()) | ||
.expect( | ||
"Failed to establish a connection with the Recovery Decider Server", | ||
); | ||
|
||
logger::info!("Recovery Decider gRPC client successfully initialized"); | ||
// Some(Box::new(client) as Box<dyn RecoveryDeciderClientInterface>) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove commented code. |
||
let boxed_client: Box<dyn RecoveryDeciderClientInterface> = Box::new(client); | ||
Some(boxed_client) | ||
} | ||
None => { | ||
logger::debug!("Recovery Decider client configuration not provided, client will be disabled"); | ||
None | ||
} | ||
} | ||
}; | ||
|
||
Arc::new(GrpcClients { | ||
#[cfg(feature = "dynamic_routing")] | ||
dynamic_routing: dynamic_routing_connection, | ||
#[cfg(feature = "dynamic_routing")] | ||
health_client, | ||
#[cfg(feature = "revenue_recovery")] | ||
recovery_decider_client, | ||
unified_connector_service_client, | ||
}) | ||
} | ||
|
@@ -145,7 +193,7 @@ pub(crate) fn create_grpc_request<T: Debug>(message: T, headers: GrpcHeaders) -> | |
let mut request = tonic::Request::new(message); | ||
request.add_headers_to_grpc_request(headers); | ||
|
||
logger::info!(dynamic_routing_request=?request); | ||
logger::info!(?request); | ||
|
||
request | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/// Recovery Decider client | ||
pub mod recovery_decider_client; | ||
|
||
use std::fmt::Debug; | ||
|
||
use common_utils::consts; | ||
use router_env::logger; | ||
|
||
/// Contains recovery grpc headers | ||
#[derive(Debug)] | ||
pub struct GrpcRecoveryHeaders { | ||
/// Request id | ||
pub request_id: Option<String>, | ||
} | ||
|
||
/// Trait to add necessary recovery headers to the tonic Request | ||
pub(crate) trait AddRecoveryHeaders { | ||
/// Add necessary recovery header fields to the tonic Request | ||
fn add_recovery_headers(&mut self, headers: GrpcRecoveryHeaders); | ||
} | ||
|
||
impl<T> AddRecoveryHeaders for tonic::Request<T> { | ||
#[track_caller] | ||
fn add_recovery_headers(&mut self, headers: GrpcRecoveryHeaders) { | ||
headers.request_id.map(|request_id| { | ||
request_id | ||
.parse() | ||
.map(|request_id_val| { | ||
self | ||
.metadata_mut() | ||
.append(consts::X_REQUEST_ID, request_id_val) | ||
}) | ||
.inspect_err( | ||
|err| logger::warn!(header_parse_error=?err,"invalid {} received",consts::X_REQUEST_ID), | ||
) | ||
.ok(); | ||
}); | ||
} | ||
} | ||
|
||
/// Creates a tonic::Request with recovery headers added. | ||
pub(crate) fn create_revenue_recovery_grpc_request<T: Debug>( | ||
message: T, | ||
recovery_headers: GrpcRecoveryHeaders, | ||
) -> tonic::Request<T> { | ||
let mut request = tonic::Request::new(message); | ||
request.add_recovery_headers(recovery_headers); | ||
request | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number and the comment don't match?