diff --git a/pkg/chunk/gcp/bigtable_index_client.go b/pkg/chunk/gcp/bigtable_index_client.go index 570ce99d1a..6bbf17e3fa 100644 --- a/pkg/chunk/gcp/bigtable_index_client.go +++ b/pkg/chunk/gcp/bigtable_index_client.go @@ -10,10 +10,12 @@ import ( "cloud.google.com/go/bigtable" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" + "google.golang.org/api/option" "github.com/cortexproject/cortex/pkg/chunk" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/pkg/errors" ) @@ -30,12 +32,21 @@ const ( type Config struct { Project string `yaml:"project"` Instance string `yaml:"instance"` + + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + + ColumnKey bool } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Project, "bigtable.project", "", "Bigtable project ID.") f.StringVar(&cfg.Instance, "bigtable.instance", "", "Bigtable instance ID.") + + cfg.GRPCClientConfig.RegisterFlags("bigtable", f) + + // Deprecated. + f.Int("bigtable.max-recv-msg-size", 100<<20, "DEPRECATED. Bigtable grpc max receive message size.") } // storageClientColumnKey implements chunk.storageClient for GCP. @@ -53,7 +64,12 @@ type storageClientV1 struct { // NewStorageClientV1 returns a new v1 StorageClient. func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...) + opts := instrumentation() + for _, dialOption := range cfg.GRPCClientConfig.DialOptions() { + opts = append(opts, option.WithGRPCDialOption(dialOption)) + } + + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 509e0fd534..48650190d4 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -12,6 +12,7 @@ import ( _ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered "google.golang.org/grpc/health/grpc_health_v1" + "github.com/cortexproject/cortex/pkg/util/grpcclient" cortex_middleware "github.com/cortexproject/cortex/pkg/util/middleware" "github.com/weaveworks/common/middleware" ) @@ -49,14 +50,8 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error middleware.StreamClientUserHeaderInterceptor, cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration), )), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)), - } - if cfg.legacyCompressToIngester { - cfg.CompressToIngester = true - } - if cfg.CompressToIngester { - opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) } + opts = append(opts, cfg.GRPCClientConfig.DialOptions()...) conn, err := grpc.Dial(addr, opts...) if err != nil { return nil, err @@ -74,16 +69,15 @@ func (c *closableHealthAndIngesterClient) Close() error { // Config is the configuration struct for the ingester client type Config struct { - MaxRecvMsgSize int - CompressToIngester bool - legacyCompressToIngester bool + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` } // RegisterFlags registers configuration settings used by the ingester client config. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - // We have seen 20MB returns from queries - add a bit of headroom - f.IntVar(&cfg.MaxRecvMsgSize, "ingester.client.max-recv-message-size", 64*1024*1024, "Maximum message size, in bytes, this client will receive.") - f.BoolVar(&cfg.CompressToIngester, "ingester.client.compress-to-ingester", false, "Compress data in calls to ingesters.") - // moved from distributor pkg, but flag prefix left as back compat fallback for existing users. - f.BoolVar(&cfg.legacyCompressToIngester, "distributor.compress-to-ingester", false, "Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead") + cfg.GRPCClientConfig.RegisterFlags("ingester.client", f) + + // Deprecated. + f.Int("ingester.client.max-recv-message-size", 64*1024*1024, "DEPRECATED. Maximum message size, in bytes, this client will receive.") + f.Bool("ingester.client.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters.") + f.Bool("distributor.compress-to-ingester", false, "DEPRECATED. Compress data in calls to ingesters. (DEPRECATED: use ingester.client.compress-to-ingester instead") } diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index a2f9fa0ba8..55ff3412f8 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/naming" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" @@ -31,6 +32,8 @@ type WorkerConfig struct { Address string Parallelism int DNSLookupDuration time.Duration + + GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` } // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -38,6 +41,8 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service.") f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.") f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") + + cfg.GRPCClientConfig.RegisterFlags("querier.frontend-client", f) } // Worker is the counter-part to the frontend, actually processing requests. @@ -142,7 +147,7 @@ func (w *worker) watchDNSLoop() { // runMany starts N runOne loops for a given address. func (w *worker) runMany(ctx context.Context, address string) { - client, err := connect(address) + client, err := w.connect(address) if err != nil { level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err) return @@ -206,14 +211,15 @@ func (w *worker) process(ctx context.Context, c Frontend_ProcessClient) error { } } -func connect(address string) (FrontendClient, error) { - conn, err := grpc.Dial( - address, +func (w *worker) connect(address string) (FrontendClient, error) { + opts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( middleware.ClientUserHeaderInterceptor, )), - ) + } + opts = append(opts, w.cfg.GRPCClientConfig.DialOptions()...) + conn, err := grpc.Dial(address, opts...) if err != nil { return nil, err } diff --git a/pkg/util/grpcclient/grpcclient.go b/pkg/util/grpcclient/grpcclient.go new file mode 100644 index 0000000000..582679b130 --- /dev/null +++ b/pkg/util/grpcclient/grpcclient.go @@ -0,0 +1,43 @@ +package grpcclient + +import ( + "flag" + + "google.golang.org/grpc" +) + +// Config for a gRPC client. +type Config struct { + MaxRecvMsgSize int `yaml:"max_recv_msg_size"` + MaxSendMsgSize int `yaml:"max_send_msg_size"` + UseGzipCompression bool `yaml:"use_gzip_compression"` + ConnectionPoolSize int `yaml:"connect_pool_size"` +} + +// RegisterFlags registers flags. +func (cfg *Config) RegisterFlags(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") + f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).") + f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.") + f.IntVar(&cfg.ConnectionPoolSize, prefix+".grpc-connection-pool-size", 1, "Number of connections to keep active per endpoint.") +} + +// CallOptions returns the config in terms of CallOptions. +func (cfg *Config) CallOptions() []grpc.CallOption { + var opts []grpc.CallOption + opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)) + opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize)) + if cfg.UseGzipCompression { + opts = append(opts, grpc.UseCompressor("gzip")) + } + return opts +} + +// DialOptions returns the config as a grpc.DialOptions. +func (cfg *Config) DialOptions() []grpc.DialOption { + result := []grpc.DialOption{grpc.WithDefaultCallOptions(cfg.CallOptions()...)} + if cfg.ConnectionPoolSize > 1 { + result = append(result, grpc.WithBalancer(grpc.RoundRobin(NewPoolResolver(cfg.ConnectionPoolSize)))) + } + return result +} diff --git a/pkg/util/grpcclient/pool.go b/pkg/util/grpcclient/pool.go new file mode 100644 index 0000000000..07e0ecb165 --- /dev/null +++ b/pkg/util/grpcclient/pool.go @@ -0,0 +1,53 @@ +package grpcclient + +import ( + "google.golang.org/grpc/naming" +) + +type poolResolver struct { + size int + next naming.Resolver +} + +// NewPoolResolver returns a PoolResolver. +func NewPoolResolver(size int) naming.Resolver { + resolver, _ := naming.NewDNSResolver() + return &poolResolver{ + size: size, + next: resolver, + } +} + +func (r *poolResolver) Resolve(target string) (naming.Watcher, error) { + watcher, err := r.next.Resolve(target) + if err != nil { + return nil, err + } + + return &poolWatcher{ + size: r.size, + next: watcher, + }, nil +} + +type poolWatcher struct { + size int + next naming.Watcher +} + +func (w *poolWatcher) Next() ([]*naming.Update, error) { + updates, err := w.next.Next() + if err != nil { + return nil, err + } + + result := make([]*naming.Update, len(updates)*w.size) + for i := 0; i < len(updates)*w.size; i++ { + result[i] = updates[i/w.size] + } + return result, nil +} + +func (w *poolWatcher) Close() { + w.next.Close() +}