Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ these warning are not emitted if the value is set to 0 or less`,
`OperatorRPSRatio is the percentage of the rate limit provided to priority rate limiters that should be used for
operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20% if not specified)`,
)
// TODO: The following 2 configs should be removed once server keepalive and client keepalive are enabled by default
EnableInternodeServerKeepAlive = NewGlobalBoolSetting(
"system.enableInternodeServerKeepAlive",
false,
`enableInternodeServerKeepAlive is the config to enable keep alive for inter-node connections on server side.`,
)
EnableInternodeClientKeepAlive = NewGlobalBoolSetting(
"system.enableInternodeClientKeepAlive",
false,
`enableInternodeClientKeepAlive is the config to enable keep alive for inter-node connections on client side.`,
)

PersistenceQPSBurstRatio = NewGlobalFloatSetting(
"system.persistenceQPSBurstRatio",
1.0,
Expand Down
12 changes: 10 additions & 2 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type (
// See LifetimeHooksModule for detail
var Module = fx.Options(
persistenceClient.Module,
dynamicconfig.Module,
fx.Provide(HostNameProvider),
fx.Provide(TimeSourceProvider),
cluster.MetadataLifetimeHooksModule,
Expand Down Expand Up @@ -339,6 +340,7 @@ func RPCFactoryProvider(
resolver *membership.GRPCResolver,
tracingStatsHandler telemetry.ClientStatsHandler,
monitor membership.Monitor,
dc *dynamicconfig.Collection,
) (common.RPCFactory, error) {
frontendURL, frontendHTTPURL, frontendHTTPPort, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver)
if err != nil {
Expand All @@ -349,7 +351,9 @@ func RPCFactoryProvider(
if tracingStatsHandler != nil {
options = append(options, grpc.WithStatsHandler(tracingStatsHandler))
}
return rpc.NewFactory(
enableServerKeepalive := dynamicconfig.EnableInternodeServerKeepAlive.Get(dc)()
enableClientKeepalive := dynamicconfig.EnableInternodeClientKeepAlive.Get(dc)()
factory := rpc.NewFactory(
cfg,
svcName,
logger,
Expand All @@ -360,7 +364,11 @@ func RPCFactoryProvider(
frontendTLSConfig,
options,
monitor,
), nil
)
factory.EnableInternodeServerKeepalive = enableServerKeepalive
factory.EnableInternodeClientKeepalive = enableClientKeepalive
Comment on lines +368 to +369
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing you set these externally instead of passing them to NewFactory to avoid having to change all the tests? that's kind of messy but it's okay since it's just temporary.

Copy link
Contributor Author

@xwduan xwduan May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, and when removing, we don't have to deal with signature change again. I will add todo comments for that as well.

logger.Info(fmt.Sprintf("RPC factory created. enableServerKeepalive: %v, enableClientKeepalive: %v", enableServerKeepalive, enableClientKeepalive))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this log line? I'd rather drop it unless you think it is very important for debugging (and then maybe debug level?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is temporary and I will remove it when fully rolled out. I want to make sure the dc change is taking effect when rolling out. I think debug level make sense.

return factory, nil
}

func FrontendHTTPClientCacheProvider(
Expand Down
29 changes: 22 additions & 7 deletions common/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"crypto/tls"
"fmt"
"math"
"math/rand"
"net"
"net/http"
Expand All @@ -23,6 +24,7 @@ import (
"go.temporal.io/server/temporal/environment"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

var _ common.RPCFactory = (*RPCFactory)(nil)
Expand All @@ -45,6 +47,9 @@ type RPCFactory struct {
// A OnceValues wrapper for createLocalFrontendHTTPClient.
localFrontendClient func() (*common.FrontendHTTPClient, error)
interNodeGrpcConnections cache.Cache

EnableInternodeServerKeepalive bool
EnableInternodeClientKeepalive bool
}

// NewFactory builds a new RPCFactory
Expand Down Expand Up @@ -115,6 +120,12 @@ func (d *RPCFactory) GetRemoteClusterClientConfig(hostname string) (*tls.Config,
func (d *RPCFactory) GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error) {
var opts []grpc.ServerOption

if d.EnableInternodeServerKeepalive {
rpcConfig := d.config.Services[string(d.serviceName)].RPC
kep := rpcConfig.KeepAliveServerConfig.GetKeepAliveEnforcementPolicy()
kp := rpcConfig.KeepAliveServerConfig.GetKeepAliveServerParameters()
opts = append(opts, grpc.KeepaliveEnforcementPolicy(kep), grpc.KeepaliveParams(kp))
}
if d.tlsFactory != nil {
serverConfig, err := d.tlsFactory.GetInternodeServerConfig()
if err != nil {
Expand All @@ -126,11 +137,6 @@ func (d *RPCFactory) GetInternodeGRPCServerOptions() ([]grpc.ServerOption, error
opts = append(opts, grpc.Creds(credentials.NewTLS(serverConfig)))
}

rpcConfig := d.config.Services[string(d.serviceName)].RPC
kep := rpcConfig.KeepAliveServerConfig.GetKeepAliveEnforcementPolicy()
kp := rpcConfig.KeepAliveServerConfig.GetKeepAliveServerParameters()
opts = append(opts, grpc.KeepaliveEnforcementPolicy(kep), grpc.KeepaliveParams(kp))

return opts, nil
}

Expand Down Expand Up @@ -251,8 +257,17 @@ func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config, dialOpti
}

func (d *RPCFactory) getClientKeepAliveConfig(serviceName primitives.ServiceName) grpc.DialOption {
serviceConfig := d.config.Services[string(serviceName)]
return grpc.WithKeepaliveParams(serviceConfig.RPC.ClientConnectionConfig.GetKeepAliveClientParameters())
// default keepalive settings for clients
params := keepalive.ClientParameters{
Time: time.Duration(math.MaxInt64),
Timeout: 20 * time.Second,
PermitWithoutStream: false,
}
if d.EnableInternodeClientKeepalive {
serviceConfig := d.config.Services[string(serviceName)]
params = serviceConfig.RPC.ClientConnectionConfig.GetKeepAliveClientParameters()
}
return grpc.WithKeepaliveParams(params)
}

func (d *RPCFactory) GetTLSConfigProvider() encryption.TLSConfigProvider {
Expand Down
1 change: 0 additions & 1 deletion service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type (
var Module = fx.Options(
resource.Module,
scheduler.Module,
dynamicconfig.Module,
deployment.Module,
workerdeployment.Module,
// Note that with this approach routes may be registered in arbitrary order.
Expand Down
1 change: 0 additions & 1 deletion service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ var Module = fx.Options(
events.Module,
cache.Module,
archival.Module,
dynamicconfig.Module,
fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly
fx.Provide(workflow.NewCommandHandlerRegistry),
fx.Provide(RetryableInterceptorProvider),
Expand Down
1 change: 0 additions & 1 deletion service/matching/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

var Module = fx.Options(
resource.Module,
dynamicconfig.Module,
deployment.Module,
workerdeployment.Module,
fx.Provide(ConfigProvider),
Expand Down
1 change: 0 additions & 1 deletion service/worker/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var Module = fx.Options(
deployment.Module, // [cleanup-wv-pre-release]
workerdeployment.Module,
dlq.Module,
dynamicconfig.Module,
fx.Provide(
func(c resource.HistoryClient) dlq.HistoryClient {
return c
Expand Down
Loading