Skip to content

[WIP] Add gRPC connection pooling for distributor->ingester connections. #1160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion pkg/chunk/gcp/bigtable_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"cloud.google.com/go/bigtable"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"google.golang.org/api/option"

"github.com/cortexproject/cortex/pkg/chunk"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/pkg/errors"
)

Expand All @@ -30,12 +32,21 @@ const (
type Config struct {
Project string `yaml:"project"`
Instance string `yaml:"instance"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`

ColumnKey bool
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Project, "bigtable.project", "", "Bigtable project ID.")
f.StringVar(&cfg.Instance, "bigtable.instance", "", "Bigtable instance ID.")

cfg.GRPCClientConfig.RegisterFlags("bigtable", f)

// Deprecated.
f.Int("bigtable.max-recv-msg-size", 100<<20, "DEPRECATED. Bigtable grpc max receive message size.")
}

// storageClientColumnKey implements chunk.storageClient for GCP.
Expand All @@ -53,7 +64,12 @@ type storageClientV1 struct {

// NewStorageClientV1 returns a new v1 StorageClient.
func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...)
opts := instrumentation()
for _, dialOption := range cfg.GRPCClientConfig.DialOptions() {
opts = append(opts, option.WithGRPCDialOption(dialOption))
}

client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...)
if err != nil {
return nil, err
}
Expand Down
24 changes: 9 additions & 15 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/util/grpcclient"
cortex_middleware "github.com/cortexproject/cortex/pkg/util/middleware"
"github.com/weaveworks/common/middleware"
)
Expand Down Expand Up @@ -49,14 +50,8 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error
middleware.StreamClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)),
}
if cfg.legacyCompressToIngester {
cfg.CompressToIngester = true
}
if cfg.CompressToIngester {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
}
opts = append(opts, cfg.GRPCClientConfig.DialOptions()...)
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
Expand All @@ -74,16 +69,15 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
MaxRecvMsgSize int
CompressToIngester bool
legacyCompressToIngester bool
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// We have seen 20MB returns from queries - add a bit of headroom
f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.")
f.BoolVar(&cfg.CompressToIngester, "ingester.client.compress-to-ingester", false, "Compress data in calls to ingesters.")
// moved from distributor pkg, but flag prefix left as back compat fallback for existing users.
f.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead")
cfg.GRPCClientConfig.RegisterFlags("ingester.client", f)

// Deprecated.
f.Int("ingester.client.max-recv-message-size", 64*1024*1024, "DEPRECATED. Maximum message size, in bytes, this client will receive.")
f.Bool("ingester.client.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters.")
f.Bool("distributor.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead")
}
16 changes: 11 additions & 5 deletions pkg/querier/frontend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/naming"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
Expand All @@ -31,13 +32,17 @@ type WorkerConfig struct {
Address string
Parallelism int
DNSLookupDuration time.Duration

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service.")
f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.")
f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.")

cfg.GRPCClientConfig.RegisterFlags("querier.frontend-client", f)
}

// Worker is the counter-part to the frontend, actually processing requests.
Expand Down Expand Up @@ -142,7 +147,7 @@ func (w *worker) watchDNSLoop() {

// runMany starts N runOne loops for a given address.
func (w *worker) runMany(ctx context.Context, address string) {
client, err := connect(address)
client, err := w.connect(address)
if err != nil {
level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err)
return
Expand Down Expand Up @@ -206,14 +211,15 @@ func (w *worker) process(ctx context.Context, c Frontend_ProcessClient) error {
}
}

func connect(address string) (FrontendClient, error) {
conn, err := grpc.Dial(
address,
func (w *worker) connect(address string) (FrontendClient, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
middleware.ClientUserHeaderInterceptor,
)),
)
}
opts = append(opts, w.cfg.GRPCClientConfig.DialOptions()...)
conn, err := grpc.Dial(address, opts...)
if err != nil {
return nil, err
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package grpcclient

import (
"flag"

"google.golang.org/grpc"
)

// Config for a gRPC client.
type Config struct {
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
UseGzipCompression bool `yaml:"use_gzip_compression"`
ConnectionPoolSize int `yaml:"connect_pool_size"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.")
f.IntVar(&cfg.ConnectionPoolSize, prefix+".grpc-connection-pool-size", 1, "Number of connections to keep active per endpoint.")
}

// CallOptions returns the config in terms of CallOptions.
func (cfg *Config) CallOptions() []grpc.CallOption {
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize))
opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize))
if cfg.UseGzipCompression {
opts = append(opts, grpc.UseCompressor("gzip"))
}
return opts
}

// DialOptions returns the config as a grpc.DialOptions.
func (cfg *Config) DialOptions() []grpc.DialOption {
result := []grpc.DialOption{grpc.WithDefaultCallOptions(cfg.CallOptions()...)}
if cfg.ConnectionPoolSize > 1 {
result = append(result, grpc.WithBalancer(grpc.RoundRobin(NewPoolResolver(cfg.ConnectionPoolSize))))
}
return result
}
53 changes: 53 additions & 0 deletions pkg/util/grpcclient/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package grpcclient

import (
"google.golang.org/grpc/naming"
)

type poolResolver struct {
size int
next naming.Resolver
}

// NewPoolResolver returns a PoolResolver.
func NewPoolResolver(size int) naming.Resolver {
resolver, _ := naming.NewDNSResolver()
return &poolResolver{
size: size,
next: resolver,
}
}

func (r *poolResolver) Resolve(target string) (naming.Watcher, error) {
watcher, err := r.next.Resolve(target)
if err != nil {
return nil, err
}

return &poolWatcher{
size: r.size,
next: watcher,
}, nil
}

type poolWatcher struct {
size int
next naming.Watcher
}

func (w *poolWatcher) Next() ([]*naming.Update, error) {
updates, err := w.next.Next()
if err != nil {
return nil, err
}

result := make([]*naming.Update, len(updates)*w.size)
for i := 0; i < len(updates)*w.size; i++ {
result[i] = updates[i/w.size]
}
return result, nil
}

func (w *poolWatcher) Close() {
w.next.Close()
}