diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bdc480d88..dfa8d3907a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680 * [FEATURE] Ruler: Add support for group labels. #6665 * [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716 +* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 * [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681 * [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 93293edf82..e3ac1c9be9 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2690,6 +2690,11 @@ ha_tracker: # CLI flag: -distributor.sign-write-requests [sign_write_requests: | default = false] +# EXPERIMENTAL: If enabled, distributor would use stream connection to send +# requests to ingesters. +# CLI flag: -distributor.use-stream-push +[use_stream_push: | default = false] + ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 0922b47859..77a66e4d29 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -127,4 +127,6 @@ Currently experimental features are: - `-ingester.instance-limits.cpu-utilization` - `-ingester.instance-limits.heap-utilization` - `-store-gateway.instance-limits.cpu-utilization` - - `-store-gateway.instance-limits.heap-utilization` \ No newline at end of file + - `-store-gateway.instance-limits.heap-utilization` +- Distributor/Ingester: Stream push connection + - Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor. \ No newline at end of file diff --git a/integration/ingester_stream_push_test.go b/integration/ingester_stream_push_test.go new file mode 100644 index 0000000000..d7992be635 --- /dev/null +++ b/integration/ingester_stream_push_test.go @@ -0,0 +1,118 @@ +//go:build requires_docker +// +build requires_docker + +package integration + +import ( + "fmt" + "math/rand" + "strconv" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestIngesterStreamPushConnection(t *testing.T) { + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + maxGlobalSeriesPerMetric := 300 + maxGlobalSeriesPerTenant := 1000 + + flags := BlocksStorageFlags() + flags["-distributor.use-stream-push"] = "true" + flags["-distributor.replication-factor"] = "1" + flags["-distributor.shard-by-all-labels"] = "true" + flags["-distributor.sharding-strategy"] = "shuffle-sharding" + flags["-distributor.ingestion-tenant-shard-size"] = "1" + flags["-ingester.max-series-per-user"] = "0" + flags["-ingester.max-series-per-metric"] = "0" + flags["-ingester.max-global-series-per-user"] = strconv.Itoa(maxGlobalSeriesPerTenant) + flags["-ingester.max-global-series-per-metric"] = strconv.Itoa(maxGlobalSeriesPerMetric) + flags["-ingester.heartbeat-period"] = "1s" + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3)) + + // Wait until distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + // Wait until ingesters have heartbeated the ring after all ingesters were active, + // in order to update the number of instances. Since we have no metric, we have to + // rely on a ugly sleep. + time.Sleep(2 * time.Second) + + now := time.Now() + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + numSeriesWithSameMetricName := 0 + numSeriesTotal := 0 + maxErrorsBeforeStop := 100 + + // Try to push as many series with the same metric name as we can. + for i, errs := 0, 0; i < 10000; i++ { + series, _ := generateSeries("test_limit_per_metric", now, prompb.Label{ + Name: "cardinality", + Value: strconv.Itoa(rand.Int()), + }) + + res, err := client.Push(series) + require.NoError(t, err) + + if res.StatusCode == 200 { + numSeriesTotal++ + numSeriesWithSameMetricName++ + } else if errs++; errs >= maxErrorsBeforeStop { + break + } + } + + // Try to push as many series with the different metric name as we can. + for i, errs := 0, 0; i < 10000; i++ { + series, _ := generateSeries(fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()), now) + res, err := client.Push(series) + require.NoError(t, err) + + if res.StatusCode == 200 { + numSeriesTotal++ + } else if errs++; errs >= maxErrorsBeforeStop { + break + } + } + + // We expect the number of series we've been successfully pushed to be around + // the limit. Due to how the global limit implementation works (lack of centralised + // coordination) the actual number of written series could be slightly different + // than the global limit, so we allow a 10% difference. + delta := 0.1 + assert.InDelta(t, maxGlobalSeriesPerMetric, numSeriesWithSameMetricName, float64(maxGlobalSeriesPerMetric)*delta) + assert.InDelta(t, maxGlobalSeriesPerTenant, numSeriesTotal, float64(maxGlobalSeriesPerTenant)*delta) + + // Ensure no service-specific metrics prefix is used by the wrong service. + assertServiceMetricsPrefixes(t, Distributor, distributor) + assertServiceMetricsPrefixes(t, Ingester, ingester1) + assertServiceMetricsPrefixes(t, Ingester, ingester2) + assertServiceMetricsPrefixes(t, Ingester, ingester3) +} diff --git a/pkg/cortexpb/cortex.pb.go b/pkg/cortexpb/cortex.pb.go index 3b63e15904..f2939d8a72 100644 --- a/pkg/cortexpb/cortex.pb.go +++ b/pkg/cortexpb/cortex.pb.go @@ -9,6 +9,7 @@ import ( fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + httpgrpc "github.com/weaveworks/common/httpgrpc" io "io" math "math" math_bits "math/bits" @@ -85,7 +86,7 @@ var MetricMetadata_MetricType_value = map[string]int32{ } func (MetricMetadata_MetricType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{5, 0} + return fileDescriptor_893a47d0a749d749, []int{6, 0} } type Histogram_ResetHint int32 @@ -112,7 +113,7 @@ var Histogram_ResetHint_value = map[string]int32{ } func (Histogram_ResetHint) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{8, 0} + return fileDescriptor_893a47d0a749d749, []int{9, 0} } type WriteRequest struct { @@ -175,13 +176,65 @@ func (m *WriteRequest) GetSkipLabelNameValidation() bool { return false } +type StreamWriteRequest struct { + TenantID string `protobuf:"bytes,1,opt,name=TenantID,proto3" json:"TenantID,omitempty"` + Request *WriteRequest `protobuf:"bytes,2,opt,name=Request,proto3" json:"Request,omitempty"` +} + +func (m *StreamWriteRequest) Reset() { *m = StreamWriteRequest{} } +func (*StreamWriteRequest) ProtoMessage() {} +func (*StreamWriteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_893a47d0a749d749, []int{1} +} +func (m *StreamWriteRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamWriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamWriteRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamWriteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamWriteRequest.Merge(m, src) +} +func (m *StreamWriteRequest) XXX_Size() int { + return m.Size() +} +func (m *StreamWriteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StreamWriteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamWriteRequest proto.InternalMessageInfo + +func (m *StreamWriteRequest) GetTenantID() string { + if m != nil { + return m.TenantID + } + return "" +} + +func (m *StreamWriteRequest) GetRequest() *WriteRequest { + if m != nil { + return m.Request + } + return nil +} + type WriteResponse struct { + GRPCResponse *httpgrpc.HTTPResponse `protobuf:"bytes,1,opt,name=GRPCResponse,proto3" json:"GRPCResponse,omitempty"` } func (m *WriteResponse) Reset() { *m = WriteResponse{} } func (*WriteResponse) ProtoMessage() {} func (*WriteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{1} + return fileDescriptor_893a47d0a749d749, []int{2} } func (m *WriteResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -210,6 +263,13 @@ func (m *WriteResponse) XXX_DiscardUnknown() { var xxx_messageInfo_WriteResponse proto.InternalMessageInfo +func (m *WriteResponse) GetGRPCResponse() *httpgrpc.HTTPResponse { + if m != nil { + return m.GRPCResponse + } + return nil +} + type TimeSeries struct { Labels []LabelAdapter `protobuf:"bytes,1,rep,name=labels,proto3,customtype=LabelAdapter" json:"labels"` // Sorted by time, oldest sample first. @@ -221,7 +281,7 @@ type TimeSeries struct { func (m *TimeSeries) Reset() { *m = TimeSeries{} } func (*TimeSeries) ProtoMessage() {} func (*TimeSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{2} + return fileDescriptor_893a47d0a749d749, []int{3} } func (m *TimeSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -279,7 +339,7 @@ type LabelPair struct { func (m *LabelPair) Reset() { *m = LabelPair{} } func (*LabelPair) ProtoMessage() {} func (*LabelPair) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{3} + return fileDescriptor_893a47d0a749d749, []int{4} } func (m *LabelPair) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -330,7 +390,7 @@ type Sample struct { func (m *Sample) Reset() { *m = Sample{} } func (*Sample) ProtoMessage() {} func (*Sample) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{4} + return fileDescriptor_893a47d0a749d749, []int{5} } func (m *Sample) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -383,7 +443,7 @@ type MetricMetadata struct { func (m *MetricMetadata) Reset() { *m = MetricMetadata{} } func (*MetricMetadata) ProtoMessage() {} func (*MetricMetadata) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{5} + return fileDescriptor_893a47d0a749d749, []int{6} } func (m *MetricMetadata) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -447,7 +507,7 @@ type Metric struct { func (m *Metric) Reset() { *m = Metric{} } func (*Metric) ProtoMessage() {} func (*Metric) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{6} + return fileDescriptor_893a47d0a749d749, []int{7} } func (m *Metric) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -486,7 +546,7 @@ type Exemplar struct { func (m *Exemplar) Reset() { *m = Exemplar{} } func (*Exemplar) ProtoMessage() {} func (*Exemplar) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{7} + return fileDescriptor_893a47d0a749d749, []int{8} } func (m *Exemplar) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -579,7 +639,7 @@ type Histogram struct { func (m *Histogram) Reset() { *m = Histogram{} } func (*Histogram) ProtoMessage() {} func (*Histogram) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{8} + return fileDescriptor_893a47d0a749d749, []int{9} } func (m *Histogram) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -780,7 +840,7 @@ type BucketSpan struct { func (m *BucketSpan) Reset() { *m = BucketSpan{} } func (*BucketSpan) ProtoMessage() {} func (*BucketSpan) Descriptor() ([]byte, []int) { - return fileDescriptor_893a47d0a749d749, []int{9} + return fileDescriptor_893a47d0a749d749, []int{10} } func (m *BucketSpan) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -828,6 +888,7 @@ func init() { proto.RegisterEnum("cortexpb.MetricMetadata_MetricType", MetricMetadata_MetricType_name, MetricMetadata_MetricType_value) proto.RegisterEnum("cortexpb.Histogram_ResetHint", Histogram_ResetHint_name, Histogram_ResetHint_value) proto.RegisterType((*WriteRequest)(nil), "cortexpb.WriteRequest") + proto.RegisterType((*StreamWriteRequest)(nil), "cortexpb.StreamWriteRequest") proto.RegisterType((*WriteResponse)(nil), "cortexpb.WriteResponse") proto.RegisterType((*TimeSeries)(nil), "cortexpb.TimeSeries") proto.RegisterType((*LabelPair)(nil), "cortexpb.LabelPair") @@ -842,72 +903,78 @@ func init() { func init() { proto.RegisterFile("cortex.proto", fileDescriptor_893a47d0a749d749) } var fileDescriptor_893a47d0a749d749 = []byte{ - // 1031 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4b, 0x6f, 0x23, 0x45, - 0x17, 0xed, 0x72, 0xfb, 0x79, 0x63, 0x3b, 0x3d, 0xf5, 0x45, 0x1f, 0xad, 0x48, 0xd3, 0x71, 0x1a, - 0x01, 0x16, 0x42, 0x01, 0x05, 0x01, 0x9a, 0x51, 0x84, 0x64, 0x0f, 0xce, 0x43, 0x33, 0x76, 0xa2, - 0xb2, 0xc3, 0x68, 0xd8, 0x58, 0x15, 0xa7, 0x12, 0xb7, 0xa6, 0x5f, 0x74, 0x95, 0xa3, 0x09, 0x2b, - 0x56, 0x88, 0x25, 0x6b, 0xb6, 0x6c, 0xf8, 0x05, 0xfc, 0x86, 0x2c, 0xb3, 0x1c, 0xb1, 0x88, 0x88, - 0xb3, 0x99, 0xe5, 0x2c, 0xf8, 0x01, 0xa8, 0xaa, 0x5f, 0xce, 0x84, 0x11, 0x9b, 0xd9, 0x55, 0x9d, - 0x7b, 0xcf, 0xbd, 0xa7, 0xea, 0x9e, 0x2e, 0x35, 0xd4, 0x27, 0x41, 0x24, 0xd8, 0x8b, 0x8d, 0x30, - 0x0a, 0x44, 0x80, 0xab, 0xf1, 0x2e, 0x3c, 0x5a, 0x5d, 0x39, 0x0d, 0x4e, 0x03, 0x05, 0x7e, 0x2a, - 0x57, 0x71, 0xdc, 0xfe, 0xa3, 0x00, 0xf5, 0xa7, 0x91, 0x23, 0x18, 0x61, 0xdf, 0xcf, 0x18, 0x17, - 0xf8, 0x00, 0x40, 0x38, 0x1e, 0xe3, 0x2c, 0x72, 0x18, 0x37, 0x51, 0x4b, 0x6f, 0x2f, 0x6d, 0xae, - 0x6c, 0xa4, 0x55, 0x36, 0x46, 0x8e, 0xc7, 0x86, 0x2a, 0xd6, 0x5d, 0xbd, 0xb8, 0x5a, 0xd3, 0xfe, - 0xbc, 0x5a, 0xc3, 0x07, 0x11, 0xa3, 0xae, 0x1b, 0x4c, 0x46, 0x19, 0x8f, 0x2c, 0xd4, 0xc0, 0x0f, - 0xa0, 0x3c, 0x0c, 0x66, 0xd1, 0x84, 0x99, 0x85, 0x16, 0x6a, 0x37, 0x37, 0xd7, 0xf3, 0x6a, 0x8b, - 0x9d, 0x37, 0xe2, 0xa4, 0x9e, 0x3f, 0xf3, 0x48, 0x42, 0xc0, 0x0f, 0xa1, 0xea, 0x31, 0x41, 0x8f, - 0xa9, 0xa0, 0xa6, 0xae, 0xa4, 0x98, 0x39, 0xb9, 0xcf, 0x44, 0xe4, 0x4c, 0xfa, 0x49, 0xbc, 0x5b, - 0xbc, 0xb8, 0x5a, 0x43, 0x24, 0xcb, 0xc7, 0x5b, 0xb0, 0xca, 0x9f, 0x3b, 0xe1, 0xd8, 0xa5, 0x47, - 0xcc, 0x1d, 0xfb, 0xd4, 0x63, 0xe3, 0x33, 0xea, 0x3a, 0xc7, 0x54, 0x38, 0x81, 0x6f, 0xbe, 0xaa, - 0xb4, 0x50, 0xbb, 0x4a, 0xde, 0x93, 0x29, 0x4f, 0x64, 0xc6, 0x80, 0x7a, 0xec, 0xdb, 0x2c, 0x6e, - 0xaf, 0x01, 0xe4, 0x7a, 0x70, 0x05, 0xf4, 0xce, 0xc1, 0x9e, 0xa1, 0xe1, 0x2a, 0x14, 0xc9, 0xe1, - 0x93, 0x9e, 0x81, 0xec, 0x65, 0x68, 0x24, 0xea, 0x79, 0x18, 0xf8, 0x9c, 0xd9, 0x7f, 0x23, 0x80, - 0xfc, 0x76, 0x70, 0x07, 0xca, 0xaa, 0x73, 0x7a, 0x87, 0xff, 0xcb, 0x85, 0xab, 0x7e, 0x07, 0xd4, - 0x89, 0xba, 0x2b, 0xc9, 0x15, 0xd6, 0x15, 0xd4, 0x39, 0xa6, 0xa1, 0x60, 0x11, 0x49, 0x88, 0xf8, - 0x33, 0xa8, 0x70, 0xea, 0x85, 0x2e, 0xe3, 0x66, 0x41, 0xd5, 0x30, 0xf2, 0x1a, 0x43, 0x15, 0x50, - 0x87, 0xd6, 0x48, 0x9a, 0x86, 0xbf, 0x84, 0x1a, 0x7b, 0xc1, 0xbc, 0xd0, 0xa5, 0x11, 0x4f, 0x2e, - 0x0c, 0xe7, 0x9c, 0x5e, 0x12, 0x4a, 0x58, 0x79, 0x2a, 0x7e, 0x00, 0x30, 0x75, 0xb8, 0x08, 0x4e, - 0x23, 0xea, 0x71, 0xb3, 0xf8, 0xa6, 0xe0, 0xdd, 0x34, 0x96, 0x30, 0x17, 0x92, 0xed, 0x2f, 0xa0, - 0x96, 0x9d, 0x07, 0x63, 0x28, 0xca, 0x8b, 0x36, 0x51, 0x0b, 0xb5, 0xeb, 0x44, 0xad, 0xf1, 0x0a, - 0x94, 0xce, 0xa8, 0x3b, 0x8b, 0xa7, 0x5f, 0x27, 0xf1, 0xc6, 0xee, 0x40, 0x39, 0x3e, 0x42, 0x1e, - 0x97, 0x24, 0x94, 0xc4, 0xf1, 0x3a, 0xd4, 0x95, 0x85, 0x04, 0xf5, 0xc2, 0xb1, 0xc7, 0x15, 0x59, - 0x27, 0x4b, 0x19, 0xd6, 0xe7, 0xf6, 0xaf, 0x05, 0x68, 0xde, 0xf6, 0x00, 0xfe, 0x0a, 0x8a, 0xe2, - 0x3c, 0x8c, 0x4b, 0x35, 0x37, 0xdf, 0x7f, 0x9b, 0x57, 0x92, 0xed, 0xe8, 0x3c, 0x64, 0x44, 0x11, - 0xf0, 0x27, 0x80, 0x3d, 0x85, 0x8d, 0x4f, 0xa8, 0xe7, 0xb8, 0xe7, 0xca, 0x2f, 0xaa, 0x69, 0x8d, - 0x18, 0x71, 0x64, 0x5b, 0x05, 0xa4, 0x4d, 0xe4, 0x31, 0xa7, 0xcc, 0x0d, 0xcd, 0xa2, 0x8a, 0xab, - 0xb5, 0xc4, 0x66, 0xbe, 0x23, 0xcc, 0x52, 0x8c, 0xc9, 0xb5, 0x7d, 0x0e, 0x90, 0x77, 0xc2, 0x4b, - 0x50, 0x39, 0x1c, 0x3c, 0x1e, 0xec, 0x3f, 0x1d, 0x18, 0x9a, 0xdc, 0x3c, 0xda, 0x3f, 0x1c, 0x8c, - 0x7a, 0xc4, 0x40, 0xb8, 0x06, 0xa5, 0x9d, 0xce, 0xe1, 0x4e, 0xcf, 0x28, 0xe0, 0x06, 0xd4, 0x76, - 0xf7, 0x86, 0xa3, 0xfd, 0x1d, 0xd2, 0xe9, 0x1b, 0x3a, 0xc6, 0xd0, 0x54, 0x91, 0x1c, 0x2b, 0x4a, - 0xea, 0xf0, 0xb0, 0xdf, 0xef, 0x90, 0x67, 0x46, 0x49, 0x1a, 0x72, 0x6f, 0xb0, 0xbd, 0x6f, 0x94, - 0x71, 0x1d, 0xaa, 0xc3, 0x51, 0x67, 0xd4, 0x1b, 0xf6, 0x46, 0x46, 0xc5, 0x7e, 0x0c, 0xe5, 0xb8, - 0xf5, 0x3b, 0x30, 0xa2, 0xfd, 0x13, 0x82, 0x6a, 0x6a, 0x9e, 0x77, 0x61, 0xec, 0x5b, 0x96, 0x78, - 0xeb, 0xc8, 0xf5, 0xbb, 0x23, 0xbf, 0x2c, 0x41, 0x2d, 0x33, 0x23, 0xbe, 0x0f, 0xb5, 0x49, 0x30, - 0xf3, 0xc5, 0xd8, 0xf1, 0x85, 0x1a, 0x79, 0x71, 0x57, 0x23, 0x55, 0x05, 0xed, 0xf9, 0x02, 0xaf, - 0xc3, 0x52, 0x1c, 0x3e, 0x71, 0x03, 0x2a, 0xe2, 0x5e, 0xbb, 0x1a, 0x01, 0x05, 0x6e, 0x4b, 0x0c, - 0x1b, 0xa0, 0xf3, 0x99, 0xa7, 0x3a, 0x21, 0x22, 0x97, 0xf8, 0xff, 0x50, 0xe6, 0x93, 0x29, 0xf3, - 0xa8, 0x1a, 0xee, 0x3d, 0x92, 0xec, 0xf0, 0x07, 0xd0, 0xfc, 0x81, 0x45, 0xc1, 0x58, 0x4c, 0x23, - 0xc6, 0xa7, 0x81, 0x7b, 0xac, 0x06, 0x8d, 0x48, 0x43, 0xa2, 0xa3, 0x14, 0xc4, 0x1f, 0x26, 0x69, - 0xb9, 0xae, 0xb2, 0xd2, 0x85, 0x48, 0x5d, 0xe2, 0x8f, 0x52, 0x6d, 0x1f, 0x83, 0xb1, 0x90, 0x17, - 0x0b, 0xac, 0x28, 0x81, 0x88, 0x34, 0xb3, 0xcc, 0x58, 0x64, 0x07, 0x9a, 0x3e, 0x3b, 0xa5, 0xc2, - 0x39, 0x63, 0x63, 0x1e, 0x52, 0x9f, 0x9b, 0xd5, 0x37, 0x5f, 0xe5, 0xee, 0x6c, 0xf2, 0x9c, 0x89, - 0x61, 0x48, 0xfd, 0xe4, 0x0b, 0x6d, 0xa4, 0x0c, 0x89, 0x71, 0xfc, 0x11, 0x2c, 0x67, 0x25, 0x8e, - 0x99, 0x2b, 0x28, 0x37, 0x6b, 0x2d, 0xbd, 0x8d, 0x49, 0x56, 0xf9, 0x1b, 0x85, 0xde, 0x4a, 0x54, - 0xda, 0xb8, 0x09, 0x2d, 0xbd, 0x8d, 0xf2, 0x44, 0x25, 0x4c, 0x3e, 0x6f, 0xcd, 0x30, 0xe0, 0xce, - 0x82, 0xa8, 0xa5, 0xff, 0x16, 0x95, 0x32, 0x32, 0x51, 0x59, 0x89, 0x44, 0x54, 0x3d, 0x16, 0x95, - 0xc2, 0xb9, 0xa8, 0x2c, 0x31, 0x11, 0xd5, 0x88, 0x45, 0xa5, 0x70, 0x22, 0x6a, 0x0b, 0x20, 0x62, - 0x9c, 0x89, 0xf1, 0x54, 0xde, 0x7c, 0x53, 0x3d, 0x02, 0xf7, 0xff, 0xe5, 0x19, 0xdb, 0x20, 0x32, - 0x6b, 0xd7, 0xf1, 0x05, 0xa9, 0x45, 0xe9, 0xf2, 0x8e, 0xff, 0x96, 0xef, 0xfa, 0xef, 0x21, 0xd4, - 0x32, 0xea, 0xed, 0xef, 0xb9, 0x02, 0xfa, 0xb3, 0xde, 0xd0, 0x40, 0xb8, 0x0c, 0x85, 0xc1, 0xbe, - 0x51, 0xc8, 0xbf, 0x69, 0x7d, 0xb5, 0xf8, 0xf3, 0x6f, 0x16, 0xea, 0x56, 0xa0, 0xa4, 0xc4, 0x77, - 0xeb, 0x00, 0xf9, 0xec, 0xed, 0x2d, 0x80, 0xfc, 0xa2, 0xa4, 0xfd, 0x82, 0x93, 0x13, 0xce, 0x62, - 0x3f, 0xdf, 0x23, 0xc9, 0x4e, 0xe2, 0x2e, 0xf3, 0x4f, 0xc5, 0x54, 0xd9, 0xb8, 0x41, 0x92, 0x5d, - 0xf7, 0xeb, 0xcb, 0x6b, 0x4b, 0x7b, 0x79, 0x6d, 0x69, 0xaf, 0xaf, 0x2d, 0xf4, 0xe3, 0xdc, 0x42, - 0xbf, 0xcf, 0x2d, 0x74, 0x31, 0xb7, 0xd0, 0xe5, 0xdc, 0x42, 0x7f, 0xcd, 0x2d, 0xf4, 0x6a, 0x6e, - 0x69, 0xaf, 0xe7, 0x16, 0xfa, 0xe5, 0xc6, 0xd2, 0x2e, 0x6f, 0x2c, 0xed, 0xe5, 0x8d, 0xa5, 0x7d, - 0x97, 0xfd, 0x14, 0x1c, 0x95, 0xd5, 0x5f, 0xc0, 0xe7, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x4b, - 0xb6, 0xdb, 0xd4, 0x35, 0x08, 0x00, 0x00, + // 1127 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x3b, 0x6f, 0x1b, 0x47, + 0x10, 0xbe, 0x25, 0x29, 0x3e, 0x46, 0x14, 0x7d, 0xde, 0x08, 0xce, 0x41, 0x80, 0x4f, 0xf4, 0x05, + 0x49, 0x88, 0x20, 0xa0, 0x02, 0xe5, 0x05, 0x1b, 0x46, 0x00, 0xd2, 0xa6, 0x45, 0xc1, 0x26, 0x45, + 0x2c, 0x4f, 0x31, 0x9c, 0x86, 0x58, 0x51, 0x2b, 0xf2, 0xe0, 0x7b, 0xe5, 0x76, 0x29, 0x5b, 0xa9, + 0x52, 0x05, 0x29, 0x53, 0xa7, 0x4d, 0x93, 0x5f, 0x90, 0xdf, 0xe0, 0x52, 0xa5, 0x91, 0xc2, 0x88, + 0xe9, 0xc6, 0xa5, 0x8b, 0xfc, 0x80, 0x60, 0xf7, 0x5e, 0xa2, 0x1f, 0x48, 0xe3, 0x6e, 0xe7, 0x9b, + 0x6f, 0x66, 0xbf, 0x9b, 0xf9, 0xee, 0x48, 0xa8, 0x4f, 0x83, 0x48, 0xb0, 0xc7, 0xed, 0x30, 0x0a, + 0x44, 0x80, 0xab, 0x71, 0x14, 0x1e, 0x6d, 0x6d, 0xce, 0x82, 0x59, 0xa0, 0xc0, 0x1d, 0x79, 0x8a, + 0xf3, 0x5b, 0x5f, 0xcd, 0x1c, 0x31, 0x5f, 0x1c, 0xb5, 0xa7, 0x81, 0xb7, 0xf3, 0x88, 0xd1, 0x53, + 0xf6, 0x28, 0x88, 0x1e, 0xf2, 0x9d, 0x69, 0xe0, 0x79, 0x81, 0xbf, 0x33, 0x17, 0x22, 0x9c, 0x45, + 0xe1, 0x34, 0x3b, 0xc4, 0x55, 0xd6, 0x5f, 0x05, 0xa8, 0xdf, 0x8f, 0x1c, 0xc1, 0x08, 0xfb, 0x71, + 0xc1, 0xb8, 0xc0, 0x23, 0x00, 0xe1, 0x78, 0x8c, 0xb3, 0xc8, 0x61, 0xdc, 0x40, 0xcd, 0x62, 0x6b, + 0x7d, 0x77, 0xb3, 0x9d, 0xde, 0xdd, 0xb6, 0x1d, 0x8f, 0x8d, 0x55, 0xae, 0xbb, 0xf5, 0xe4, 0xd9, + 0xb6, 0xf6, 0xf7, 0xb3, 0x6d, 0x3c, 0x8a, 0x18, 0x75, 0xdd, 0x60, 0x6a, 0x67, 0x75, 0xe4, 0x42, + 0x0f, 0x7c, 0x1d, 0xca, 0xe3, 0x60, 0x11, 0x4d, 0x99, 0x51, 0x68, 0xa2, 0x56, 0x63, 0xf7, 0x5a, + 0xde, 0xed, 0xe2, 0xcd, 0xed, 0x98, 0xd4, 0xf3, 0x17, 0x1e, 0x49, 0x0a, 0xf0, 0x0d, 0xa8, 0x7a, + 0x4c, 0xd0, 0x63, 0x2a, 0xa8, 0x51, 0x54, 0x52, 0x8c, 0xbc, 0x78, 0xc0, 0x44, 0xe4, 0x4c, 0x07, + 0x49, 0xbe, 0x5b, 0x7a, 0xf2, 0x6c, 0x1b, 0x91, 0x8c, 0x8f, 0x6f, 0xc2, 0x16, 0x7f, 0xe8, 0x84, + 0x13, 0x97, 0x1e, 0x31, 0x77, 0xe2, 0x53, 0x8f, 0x4d, 0x4e, 0xa9, 0xeb, 0x1c, 0x53, 0xe1, 0x04, + 0xbe, 0xf1, 0xb2, 0xd2, 0x44, 0xad, 0x2a, 0xf9, 0x50, 0x52, 0xee, 0x49, 0xc6, 0x90, 0x7a, 0xec, + 0xfb, 0x2c, 0x6f, 0x6d, 0x03, 0xe4, 0x7a, 0x70, 0x05, 0x8a, 0x9d, 0xd1, 0xbe, 0xae, 0xe1, 0x2a, + 0x94, 0xc8, 0xe1, 0xbd, 0x9e, 0x8e, 0xac, 0x23, 0xc0, 0x63, 0x11, 0x31, 0xea, 0xad, 0x4c, 0x6f, + 0x0b, 0xaa, 0x36, 0xf3, 0xa9, 0x2f, 0xf6, 0x6f, 0x1b, 0xa8, 0x89, 0x5a, 0x35, 0x92, 0xc5, 0xf8, + 0x0b, 0xa8, 0x24, 0x34, 0x35, 0x88, 0xf5, 0xdd, 0x2b, 0x6f, 0x1f, 0x04, 0x49, 0x69, 0xd6, 0x5d, + 0xd8, 0x48, 0x12, 0x3c, 0x0c, 0x7c, 0x2e, 0xe7, 0x51, 0xdf, 0x23, 0xa3, 0x5b, 0x69, 0xac, 0xae, + 0x90, 0x7d, 0xb2, 0xa5, 0xf6, 0x6d, 0x7b, 0x94, 0x66, 0xc9, 0x0a, 0xd7, 0xfa, 0x17, 0x01, 0xe4, + 0xdb, 0xc3, 0x1d, 0x28, 0xab, 0xc9, 0xa4, 0x3b, 0xfe, 0x20, 0x17, 0xa3, 0xe6, 0x31, 0xa2, 0x4e, + 0xd4, 0xdd, 0x4c, 0x56, 0x5c, 0x57, 0x50, 0xe7, 0x98, 0x86, 0x82, 0x45, 0x24, 0x29, 0x94, 0x0f, + 0xc4, 0xa9, 0x17, 0xba, 0x8c, 0x1b, 0x05, 0xd5, 0x43, 0xcf, 0x7b, 0x8c, 0x55, 0x42, 0x2d, 0x45, + 0x23, 0x29, 0x0d, 0x7f, 0x03, 0x35, 0xf6, 0x98, 0x79, 0xa1, 0x4b, 0x23, 0x9e, 0x2c, 0x14, 0xe7, + 0x35, 0xbd, 0x24, 0x95, 0x54, 0xe5, 0x54, 0x7c, 0x1d, 0x60, 0xee, 0x70, 0x11, 0xcc, 0x22, 0xea, + 0x71, 0xa3, 0xf4, 0xba, 0xe0, 0x7e, 0x9a, 0x4b, 0x2a, 0x2f, 0x90, 0xad, 0xaf, 0xa1, 0x96, 0x3d, + 0x0f, 0xc6, 0x50, 0x92, 0x46, 0x50, 0x73, 0xab, 0x13, 0x75, 0xc6, 0x9b, 0xb0, 0x76, 0x4a, 0xdd, + 0x45, 0xec, 0xce, 0x3a, 0x89, 0x03, 0xab, 0x03, 0xe5, 0xf8, 0x11, 0xf2, 0xbc, 0x2c, 0x42, 0x49, + 0x1e, 0x5f, 0x83, 0xba, 0xb2, 0xb8, 0xa0, 0x5e, 0x38, 0xf1, 0xb8, 0x2a, 0x2e, 0x92, 0xf5, 0x0c, + 0x1b, 0x70, 0xeb, 0xf7, 0x02, 0x34, 0x56, 0x3d, 0x8a, 0xbf, 0x85, 0x92, 0x38, 0x0b, 0xe3, 0x56, + 0x8d, 0xdd, 0x8f, 0xde, 0xe5, 0xe5, 0x24, 0xb4, 0xcf, 0x42, 0x46, 0x54, 0x01, 0xfe, 0x1c, 0xb0, + 0xa7, 0xb0, 0xc9, 0x09, 0xf5, 0x1c, 0xf7, 0x4c, 0xf9, 0x59, 0x5d, 0x5a, 0x23, 0x7a, 0x9c, 0xb9, + 0xa3, 0x12, 0xd2, 0xc6, 0xf2, 0x31, 0xe7, 0xcc, 0x0d, 0x8d, 0x92, 0xca, 0xab, 0xb3, 0xc4, 0x16, + 0xbe, 0x23, 0x8c, 0xb5, 0x18, 0x93, 0x67, 0xeb, 0x0c, 0x20, 0xbf, 0x09, 0xaf, 0x43, 0xe5, 0x70, + 0x78, 0x77, 0x78, 0x70, 0x7f, 0xa8, 0x6b, 0x32, 0xb8, 0x75, 0x70, 0x38, 0xb4, 0x7b, 0x44, 0x47, + 0xb8, 0x06, 0x6b, 0x7b, 0x9d, 0xc3, 0xbd, 0x9e, 0x5e, 0xc0, 0x1b, 0x50, 0xeb, 0xef, 0x8f, 0xed, + 0x83, 0x3d, 0xd2, 0x19, 0xe8, 0x45, 0x8c, 0xa1, 0xa1, 0x32, 0x39, 0x56, 0x92, 0xa5, 0xe3, 0xc3, + 0xc1, 0xa0, 0x43, 0x1e, 0xe8, 0x6b, 0xf2, 0x85, 0xd9, 0x1f, 0xde, 0x39, 0xd0, 0xcb, 0xb8, 0x0e, + 0xd5, 0xb1, 0xdd, 0xb1, 0x7b, 0xe3, 0x9e, 0xad, 0x57, 0xac, 0xbb, 0x50, 0x8e, 0xaf, 0x7e, 0x0f, + 0x46, 0xb4, 0x7e, 0x41, 0x50, 0x4d, 0xcd, 0xf3, 0x3e, 0x8c, 0xbd, 0x62, 0x89, 0x77, 0xae, 0xbc, + 0xf8, 0xe6, 0xca, 0xcf, 0xd7, 0xa0, 0x96, 0x99, 0x11, 0x5f, 0x85, 0xda, 0x34, 0x58, 0xf8, 0x62, + 0xe2, 0xf8, 0x42, 0xad, 0xbc, 0xd4, 0xd7, 0x48, 0x55, 0x41, 0xfb, 0xbe, 0xc0, 0xd7, 0x60, 0x3d, + 0x4e, 0x9f, 0xb8, 0x01, 0x8d, 0xbf, 0x09, 0xa8, 0xaf, 0x11, 0x50, 0xe0, 0x1d, 0x89, 0x61, 0x1d, + 0x8a, 0x7c, 0xe1, 0xa9, 0x9b, 0x10, 0x91, 0x47, 0x7c, 0x05, 0xca, 0x7c, 0x3a, 0x67, 0x1e, 0x55, + 0xcb, 0xbd, 0x4c, 0x92, 0x08, 0x7f, 0x0c, 0x8d, 0x9f, 0x58, 0x14, 0x4c, 0xc4, 0x3c, 0x62, 0x7c, + 0x1e, 0xb8, 0xc7, 0x6a, 0xd1, 0x88, 0x6c, 0x48, 0xd4, 0x4e, 0x41, 0xfc, 0x49, 0x42, 0xcb, 0x75, + 0x95, 0x95, 0x2e, 0x44, 0xea, 0x12, 0xbf, 0x95, 0x6a, 0xfb, 0x0c, 0xf4, 0x0b, 0xbc, 0x58, 0x60, + 0x45, 0x09, 0x44, 0xa4, 0x91, 0x31, 0x63, 0x91, 0x1d, 0x68, 0xf8, 0x6c, 0x46, 0x85, 0x73, 0xca, + 0x26, 0x3c, 0xa4, 0x3e, 0x37, 0xaa, 0xaf, 0xff, 0x6a, 0x74, 0x17, 0xd3, 0x87, 0x4c, 0x8c, 0x43, + 0xea, 0x27, 0x6f, 0xe8, 0x46, 0x5a, 0x21, 0x31, 0x8e, 0x3f, 0x85, 0x4b, 0x59, 0x8b, 0x63, 0xe6, + 0x0a, 0xca, 0x8d, 0x5a, 0xb3, 0xd8, 0xc2, 0x24, 0xeb, 0x7c, 0x5b, 0xa1, 0x2b, 0x44, 0xa5, 0x8d, + 0x1b, 0xd0, 0x2c, 0xb6, 0x50, 0x4e, 0x54, 0xc2, 0xe4, 0xe7, 0xad, 0x11, 0x06, 0xdc, 0xb9, 0x20, + 0x6a, 0xfd, 0xff, 0x45, 0xa5, 0x15, 0x99, 0xa8, 0xac, 0x45, 0x22, 0xaa, 0x1e, 0x8b, 0x4a, 0xe1, + 0x5c, 0x54, 0x46, 0x4c, 0x44, 0x6d, 0xc4, 0xa2, 0x52, 0x38, 0x11, 0x75, 0x13, 0x20, 0x62, 0x9c, + 0x89, 0xc9, 0x5c, 0x4e, 0xbe, 0xa1, 0x3e, 0x02, 0x57, 0xdf, 0xf2, 0x19, 0x6b, 0x13, 0xc9, 0xea, + 0x3b, 0xbe, 0x20, 0xb5, 0x28, 0x3d, 0xbe, 0xe1, 0xbf, 0x4b, 0x6f, 0xfa, 0xef, 0x06, 0xd4, 0xb2, + 0xd2, 0xd5, 0xf7, 0xb9, 0x02, 0xc5, 0x07, 0xbd, 0xb1, 0x8e, 0x70, 0x19, 0x0a, 0xc3, 0x03, 0xbd, + 0x90, 0xbf, 0xd3, 0xc5, 0xad, 0xd2, 0xaf, 0x7f, 0x98, 0xa8, 0x5b, 0x81, 0x35, 0x25, 0xbe, 0x5b, + 0x07, 0xc8, 0x77, 0x6f, 0xdd, 0x04, 0xc8, 0x07, 0x25, 0xed, 0x17, 0x9c, 0x9c, 0x70, 0x16, 0xfb, + 0xf9, 0x32, 0x49, 0x22, 0x89, 0xbb, 0xcc, 0x9f, 0x89, 0xb9, 0xb2, 0xf1, 0x06, 0x49, 0xa2, 0xee, + 0x77, 0xe7, 0xcf, 0x4d, 0xed, 0xe9, 0x73, 0x53, 0x7b, 0xf5, 0xdc, 0x44, 0x3f, 0x2f, 0x4d, 0xf4, + 0xe7, 0xd2, 0x44, 0x4f, 0x96, 0x26, 0x3a, 0x5f, 0x9a, 0xe8, 0x9f, 0xa5, 0x89, 0x5e, 0x2e, 0x4d, + 0xed, 0xd5, 0xd2, 0x44, 0xbf, 0xbd, 0x30, 0xb5, 0xf3, 0x17, 0xa6, 0xf6, 0xf4, 0x85, 0xa9, 0xfd, + 0x90, 0xfd, 0xd5, 0x39, 0x2a, 0xab, 0x7f, 0x29, 0x5f, 0xfe, 0x17, 0x00, 0x00, 0xff, 0xff, 0xb2, + 0x30, 0x5f, 0x72, 0x0b, 0x09, 0x00, 0x00, } func (x WriteRequest_SourceEnum) String() string { @@ -974,6 +1041,33 @@ func (this *WriteRequest) Equal(that interface{}) bool { } return true } +func (this *StreamWriteRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*StreamWriteRequest) + if !ok { + that2, ok := that.(StreamWriteRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.TenantID != that1.TenantID { + return false + } + if !this.Request.Equal(that1.Request) { + return false + } + return true +} func (this *WriteResponse) Equal(that interface{}) bool { if that == nil { return this == nil @@ -993,6 +1087,9 @@ func (this *WriteResponse) Equal(that interface{}) bool { } else if this == nil { return false } + if !this.GRPCResponse.Equal(that1.GRPCResponse) { + return false + } return true } func (this *TimeSeries) Equal(that interface{}) bool { @@ -1439,12 +1536,28 @@ func (this *WriteRequest) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *StreamWriteRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&cortexpb.StreamWriteRequest{") + s = append(s, "TenantID: "+fmt.Sprintf("%#v", this.TenantID)+",\n") + if this.Request != nil { + s = append(s, "Request: "+fmt.Sprintf("%#v", this.Request)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} func (this *WriteResponse) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 4) + s := make([]string, 0, 5) s = append(s, "&cortexpb.WriteResponse{") + if this.GRPCResponse != nil { + s = append(s, "GRPCResponse: "+fmt.Sprintf("%#v", this.GRPCResponse)+",\n") + } s = append(s, "}") return strings.Join(s, "") } @@ -1693,6 +1806,48 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *StreamWriteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StreamWriteRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamWriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Request != nil { + { + size, err := m.Request.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintCortex(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if len(m.TenantID) > 0 { + i -= len(m.TenantID) + copy(dAtA[i:], m.TenantID) + i = encodeVarintCortex(dAtA, i, uint64(len(m.TenantID))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *WriteResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1713,6 +1868,18 @@ func (m *WriteResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.GRPCResponse != nil { + { + size, err := m.GRPCResponse.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintCortex(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } return len(dAtA) - i, nil } @@ -2032,30 +2199,30 @@ func (m *Histogram) MarshalToSizedBuffer(dAtA []byte) (int, error) { } if len(m.PositiveCounts) > 0 { for iNdEx := len(m.PositiveCounts) - 1; iNdEx >= 0; iNdEx-- { - f1 := math.Float64bits(float64(m.PositiveCounts[iNdEx])) + f3 := math.Float64bits(float64(m.PositiveCounts[iNdEx])) i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f1)) + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f3)) } i = encodeVarintCortex(dAtA, i, uint64(len(m.PositiveCounts)*8)) i-- dAtA[i] = 0x6a } if len(m.PositiveDeltas) > 0 { - var j2 int - dAtA4 := make([]byte, len(m.PositiveDeltas)*10) + var j4 int + dAtA6 := make([]byte, len(m.PositiveDeltas)*10) for _, num := range m.PositiveDeltas { - x3 := (uint64(num) << 1) ^ uint64((num >> 63)) - for x3 >= 1<<7 { - dAtA4[j2] = uint8(uint64(x3)&0x7f | 0x80) - j2++ - x3 >>= 7 - } - dAtA4[j2] = uint8(x3) - j2++ + x5 := (uint64(num) << 1) ^ uint64((num >> 63)) + for x5 >= 1<<7 { + dAtA6[j4] = uint8(uint64(x5)&0x7f | 0x80) + j4++ + x5 >>= 7 + } + dAtA6[j4] = uint8(x5) + j4++ } - i -= j2 - copy(dAtA[i:], dAtA4[:j2]) - i = encodeVarintCortex(dAtA, i, uint64(j2)) + i -= j4 + copy(dAtA[i:], dAtA6[:j4]) + i = encodeVarintCortex(dAtA, i, uint64(j4)) i-- dAtA[i] = 0x62 } @@ -2075,30 +2242,30 @@ func (m *Histogram) MarshalToSizedBuffer(dAtA []byte) (int, error) { } if len(m.NegativeCounts) > 0 { for iNdEx := len(m.NegativeCounts) - 1; iNdEx >= 0; iNdEx-- { - f5 := math.Float64bits(float64(m.NegativeCounts[iNdEx])) + f7 := math.Float64bits(float64(m.NegativeCounts[iNdEx])) i -= 8 - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f5)) + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f7)) } i = encodeVarintCortex(dAtA, i, uint64(len(m.NegativeCounts)*8)) i-- dAtA[i] = 0x52 } if len(m.NegativeDeltas) > 0 { - var j6 int - dAtA8 := make([]byte, len(m.NegativeDeltas)*10) + var j8 int + dAtA10 := make([]byte, len(m.NegativeDeltas)*10) for _, num := range m.NegativeDeltas { - x7 := (uint64(num) << 1) ^ uint64((num >> 63)) - for x7 >= 1<<7 { - dAtA8[j6] = uint8(uint64(x7)&0x7f | 0x80) - j6++ - x7 >>= 7 - } - dAtA8[j6] = uint8(x7) - j6++ + x9 := (uint64(num) << 1) ^ uint64((num >> 63)) + for x9 >= 1<<7 { + dAtA10[j8] = uint8(uint64(x9)&0x7f | 0x80) + j8++ + x9 >>= 7 + } + dAtA10[j8] = uint8(x9) + j8++ } - i -= j6 - copy(dAtA[i:], dAtA8[:j6]) - i = encodeVarintCortex(dAtA, i, uint64(j6)) + i -= j8 + copy(dAtA[i:], dAtA10[:j8]) + i = encodeVarintCortex(dAtA, i, uint64(j8)) i-- dAtA[i] = 0x4a } @@ -2271,12 +2438,33 @@ func (m *WriteRequest) Size() (n int) { return n } +func (m *StreamWriteRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.TenantID) + if l > 0 { + n += 1 + l + sovCortex(uint64(l)) + } + if m.Request != nil { + l = m.Request.Size() + n += 1 + l + sovCortex(uint64(l)) + } + return n +} + func (m *WriteResponse) Size() (n int) { if m == nil { return 0 } var l int _ = l + if m.GRPCResponse != nil { + l = m.GRPCResponse.Size() + n += 1 + l + sovCortex(uint64(l)) + } return n } @@ -2542,11 +2730,23 @@ func (this *WriteRequest) String() string { }, "") return s } +func (this *StreamWriteRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&StreamWriteRequest{`, + `TenantID:` + fmt.Sprintf("%v", this.TenantID) + `,`, + `Request:` + strings.Replace(this.Request.String(), "WriteRequest", "WriteRequest", 1) + `,`, + `}`, + }, "") + return s +} func (this *WriteResponse) String() string { if this == nil { return "nil" } s := strings.Join([]string{`&WriteResponse{`, + `GRPCResponse:` + strings.Replace(fmt.Sprintf("%v", this.GRPCResponse), "HTTPResponse", "httpgrpc.HTTPResponse", 1) + `,`, `}`, }, "") return s @@ -2887,6 +3087,127 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { } return nil } +func (m *StreamWriteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamWriteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamWriteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TenantID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TenantID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Request", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Request == nil { + m.Request = &WriteRequest{} + } + if err := m.Request.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCortex(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *WriteResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2916,6 +3237,42 @@ func (m *WriteResponse) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: WriteResponse: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field GRPCResponse", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCortex + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.GRPCResponse == nil { + m.GRPCResponse = &httpgrpc.HTTPResponse{} + } + if err := m.GRPCResponse.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipCortex(dAtA[iNdEx:]) diff --git a/pkg/cortexpb/cortex.proto b/pkg/cortexpb/cortex.proto index cedb173183..0705819c5c 100644 --- a/pkg/cortexpb/cortex.proto +++ b/pkg/cortexpb/cortex.proto @@ -5,6 +5,7 @@ package cortexpb; option go_package = "cortexpb"; import "gogoproto/gogo.proto"; +import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto"; option (gogoproto.marshaler_all) = true; option (gogoproto.unmarshaler_all) = true; @@ -21,7 +22,14 @@ message WriteRequest { bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus } -message WriteResponse {} +message StreamWriteRequest { + string TenantID = 1; + WriteRequest Request = 2; +} + +message WriteResponse { + httpgrpc.HTTPResponse GRPCResponse = 1; +} message TimeSeries { repeated LabelPair labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "LabelAdapter"]; diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7560af571a..734a5fa034 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -4,7 +4,7 @@ import ( "context" "flag" "fmt" - io "io" + "io" "net/http" "sort" "strings" @@ -151,6 +151,7 @@ type Config struct { ShardByAllLabels bool `yaml:"shard_by_all_labels"` ExtendWrites bool `yaml:"extend_writes"` SignWriteRequestsEnabled bool `yaml:"sign_write_requests"` + UseStreamPush bool `yaml:"use_stream_push"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` @@ -205,6 +206,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.") f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.") + f.BoolVar(&cfg.UseStreamPush, "distributor.use-stream-push", false, "EXPERIMENTAL: If enabled, distributor would use stream connection to send requests to ingesters.") f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names and values for now), only results from quorum number of zones will be included.") @@ -243,7 +245,7 @@ const ( func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) { - return ingester_client.MakeIngesterClient(addr, clientConfig) + return ingester_client.MakeIngesterClient(addr, clientConfig, cfg.UseStreamPush) } } @@ -1140,20 +1142,29 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time c := h.(ingester_client.HealthAndIngesterClient) - req := cortexpb.PreallocWriteRequestFromPool() - req.Timeseries = timeseries - req.Metadata = metadata - req.Source = source - d.inflightClientRequests.Inc() defer d.inflightClientRequests.Dec() - _, err = c.PushPreAlloc(ctx, req) + if d.cfg.UseStreamPush { + req := &cortexpb.WriteRequest{ + Timeseries: timeseries, + Metadata: metadata, + Source: source, + } + _, err = c.PushStreamConnection(ctx, req) + } else { + req := cortexpb.PreallocWriteRequestFromPool() + req.Timeseries = timeseries + req.Metadata = metadata + req.Source = source - // We should not reuse the req in case of errors: - // See: https://github.com/grpc/grpc-go/issues/6355 - if err == nil { - cortexpb.ReuseWriteRequest(req) + _, err = c.PushPreAlloc(ctx, req) + + // We should not reuse the req in case of errors: + // See: https://github.com/grpc/grpc-go/issues/6355 + if err == nil { + cortexpb.ReuseWriteRequest(req) + } } if len(metadata) > 0 { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3618f285fe..2d3ceb62c1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -352,46 +352,50 @@ func TestDistributor_Push(t *testing.T) { `, }, } { - for _, shardByAllLabels := range []bool{true, false} { - tc := tc - name := name - shardByAllLabels := shardByAllLabels - t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v)", name, shardByAllLabels), func(t *testing.T) { - t.Parallel() - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.IngestionRate = 20 - limits.IngestionBurstSize = 20 + for _, useStreamPush := range []bool{false, true} { + for _, shardByAllLabels := range []bool{true, false} { + tc := tc + name := name + shardByAllLabels := shardByAllLabels + useStreamPush := useStreamPush + t.Run(fmt.Sprintf("[%s](shardByAllLabels=%v,useStreamPush=%v)", name, shardByAllLabels, useStreamPush), func(t *testing.T) { + t.Parallel() + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.IngestionRate = 20 + limits.IngestionBurstSize = 20 + + ds, _, regs, _ := prepare(t, prepConfig{ + numIngesters: tc.numIngesters, + happyIngesters: tc.happyIngesters, + numDistributors: 1, + shardByAllLabels: shardByAllLabels, + limits: limits, + errFail: tc.ingesterError, + useStreamPush: useStreamPush, + }) - ds, _, regs, _ := prepare(t, prepConfig{ - numIngesters: tc.numIngesters, - happyIngesters: tc.happyIngesters, - numDistributors: 1, - shardByAllLabels: shardByAllLabels, - limits: limits, - errFail: tc.ingesterError, + var request *cortexpb.WriteRequest + if !tc.histogramSamples { + request = makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, 0) + } else { + request = makeWriteRequest(tc.samples.startTimestampMs, 0, tc.metadata, tc.samples.num) + } + response, err := ds[0].Push(ctx, request) + assert.Equal(t, tc.expectedResponse, response) + assert.Equal(t, status.Code(tc.expectedError), status.Code(err)) + + // Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum + // is reached, when we reach this point the 3rd ingester may not have received series/metadata + // yet. To avoid flaky test we retry metrics assertion until we hit the desired state (no error) + // within a reasonable timeout. + if tc.expectedMetrics != "" { + test.Poll(t, time.Second, nil, func() interface{} { + return testutil.GatherAndCompare(regs[0], strings.NewReader(tc.expectedMetrics), tc.metricNames...) + }) + } }) - - var request *cortexpb.WriteRequest - if !tc.histogramSamples { - request = makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata, 0) - } else { - request = makeWriteRequest(tc.samples.startTimestampMs, 0, tc.metadata, tc.samples.num) - } - response, err := ds[0].Push(ctx, request) - assert.Equal(t, tc.expectedResponse, response) - assert.Equal(t, status.Code(tc.expectedError), status.Code(err)) - - // Check tracked Prometheus metrics. Since the Push() response is sent as soon as the quorum - // is reached, when we reach this point the 3rd ingester may not have received series/metadata - // yet. To avoid flaky test we retry metrics assertion until we hit the desired state (no error) - // within a reasonable timeout. - if tc.expectedMetrics != "" { - test.Poll(t, time.Second, nil, func() interface{} { - return testutil.GatherAndCompare(regs[0], strings.NewReader(tc.expectedMetrics), tc.metricNames...) - }) - } - }) + } } } } @@ -2341,6 +2345,7 @@ func BenchmarkDistributor_Push(b *testing.B) { distributorCfg.IngesterClientFactory = func(addr string) (ring_client.PoolClient, error) { return &noopIngester{}, nil } + distributorCfg.UseStreamPush = false overrides, err := validation.NewOverrides(limits, nil) require.NoError(b, err) @@ -2837,6 +2842,7 @@ type prepConfig struct { enableTracker bool errFail error tokens [][]uint32 + useStreamPush bool } type prepState struct { @@ -2951,6 +2957,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, [] distributorCfg.InstanceLimits.MaxInflightPushRequests = cfg.maxInflightRequests distributorCfg.InstanceLimits.MaxInflightClientRequests = cfg.maxInflightClientRequests distributorCfg.InstanceLimits.MaxIngestionRate = cfg.maxIngestionRate + distributorCfg.UseStreamPush = cfg.useStreamPush if cfg.shuffleShardEnabled { distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle @@ -3308,6 +3315,10 @@ func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWr return i.Push(ctx, &in.WriteRequest, opts...) } +func (i *mockIngester) PushStreamConnection(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return i.Push(ctx, in, opts...) +} + func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { i.Lock() defer i.Unlock() @@ -3562,6 +3573,10 @@ func (i *noopIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt return nil, nil } +func (i *noopIngester) PushStreamConnection(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return nil, nil +} + type queryStream struct { grpc.ClientStream i int diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index b1c5a8b28a..4611189dee 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -3,8 +3,12 @@ package client import ( "context" "flag" + "fmt" + "io" + "sync" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock" @@ -12,6 +16,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" @@ -31,6 +37,8 @@ var ingesterClientInflightPushRequests = promauto.NewGaugeVec(prometheus.GaugeOp var errTooManyInflightPushRequests = errors.New("too many inflight push requests in ingester client") +const INGESTER_CLIENT_STREAM_WORKER_COUNT = 100 + // ClosableClientConn is grpc.ClientConnInterface with Close function type ClosableClientConn interface { grpc.ClientConnInterface @@ -43,6 +51,15 @@ type HealthAndIngesterClient interface { grpc_health_v1.HealthClient Close() error PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) + PushStreamConnection(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) +} + +type streamWriteJob struct { + req *cortexpb.StreamWriteRequest + resp *cortexpb.WriteResponse + ctx context.Context + cancel context.CancelFunc + err error } type closableHealthAndIngesterClient struct { @@ -53,6 +70,9 @@ type closableHealthAndIngesterClient struct { maxInflightPushRequests int64 inflightRequests atomic.Int64 inflightPushRequests *prometheus.GaugeVec + streamPushChan chan *streamWriteJob + streamCtx context.Context + streamCancel context.CancelFunc } func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { @@ -72,6 +92,38 @@ func (c *closableHealthAndIngesterClient) Push(ctx context.Context, in *cortexpb }) } +func (c *closableHealthAndIngesterClient) PushStreamConnection(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return c.handlePushRequest(func() (*cortexpb.WriteResponse, error) { + select { + case <-c.streamCtx.Done(): + return nil, errors.Wrap(c.streamCtx.Err(), "ingester client stream connection closed") + default: + } + + tenantID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + streamReq := &cortexpb.StreamWriteRequest{ + TenantID: tenantID, + Request: in, + } + + reqCtx, reqCancel := context.WithCancel(ctx) + defer reqCancel() + + job := &streamWriteJob{ + req: streamReq, + ctx: reqCtx, + cancel: reqCancel, + } + c.streamPushChan <- job + <-reqCtx.Done() + return job.resp, job.err + }) +} + func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*cortexpb.WriteResponse, error)) (*cortexpb.WriteResponse, error) { currentInflight := c.inflightRequests.Inc() c.inflightPushRequests.WithLabelValues(c.addr).Set(float64(currentInflight)) @@ -85,7 +137,7 @@ func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*co } // MakeIngesterClient makes a new IngesterClient -func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) { +func MakeIngesterClient(addr string, cfg Config, useStreamConnection bool) (HealthAndIngesterClient, error) { dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration)) if err != nil { return nil, err @@ -94,21 +146,116 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error if err != nil { return nil, err } - return &closableHealthAndIngesterClient{ + c := &closableHealthAndIngesterClient{ IngesterClient: NewIngesterClient(conn), HealthClient: grpc_health_v1.NewHealthClient(conn), conn: conn, addr: addr, maxInflightPushRequests: cfg.MaxInflightPushRequests, inflightPushRequests: ingesterClientInflightPushRequests, - }, nil + } + if useStreamConnection { + streamCtx, streamCancel := context.WithCancel(context.Background()) + err = c.Run(make(chan *streamWriteJob, INGESTER_CLIENT_STREAM_WORKER_COUNT), streamCtx, streamCancel) + if err != nil { + return nil, err + } + } + return c, nil } func (c *closableHealthAndIngesterClient) Close() error { c.inflightPushRequests.DeleteLabelValues(c.addr) + + if c.streamCancel != nil { + c.streamCancel() + } + + if c.streamPushChan != nil { + drainingLoop: + for { + select { + case job, ok := <-c.streamPushChan: + if !ok { + break drainingLoop + } + if job != nil && job.cancel != nil { + job.err = errors.New("stream connection ingester client closing") + job.cancel() + } + default: + close(c.streamPushChan) + break drainingLoop + } + } + } + return c.conn.Close() } +func (c *closableHealthAndIngesterClient) Run(streamPushChan chan *streamWriteJob, streamCtx context.Context, streamCancel context.CancelFunc) error { + c.streamPushChan = streamPushChan + c.streamCtx = streamCtx + c.streamCancel = streamCancel + + var workerErr error + var wg sync.WaitGroup + for i := 0; i < INGESTER_CLIENT_STREAM_WORKER_COUNT; i++ { + workerName := fmt.Sprintf("stream-push-worker-%d", i) + wg.Add(1) + go func() { + workerCtx := user.InjectOrgID(streamCtx, workerName) + err := c.worker(workerCtx) + if err != nil { + workerErr = err + } + wg.Done() + }() + } + wg.Wait() + return workerErr +} + +func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error { + stream, err := c.PushStream(ctx) + if err != nil { + return err + } + go func() { + for { + select { + case <-ctx.Done(): + return + case job := <-c.streamPushChan: + err = stream.Send(job.req) + if err == io.EOF { + job.resp = &cortexpb.WriteResponse{} + job.cancel() + return + } + if err != nil { + job.err = err + job.cancel() + continue + } + resp, err := stream.Recv() + if err == io.EOF { + job.resp = &cortexpb.WriteResponse{} + job.cancel() + return + } + job.resp = resp + job.err = err + if err == nil && job.resp.GetGRPCResponse() != nil { + job.err = httpgrpc.ErrorFromHTTPResponse(job.resp.GetGRPCResponse()) + } + job.cancel() + } + } + }() + return nil +} + // Config is the configuration struct for the ingester client type Config struct { GRPCClientConfig grpcclient.ConfigWithHealthCheck `yaml:"grpc_client_config"` diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 55e9a40a9e..da41b03636 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -5,6 +5,7 @@ import ( "net/http/httptest" "strconv" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -131,3 +132,98 @@ func (m *mockClientConn) Invoke(_ context.Context, _ string, _ any, _ any, _ ... func (m *mockClientConn) Close() error { return nil } + +func TestClosableHealthAndIngesterClient_Close_Basic(t *testing.T) { + client := &closableHealthAndIngesterClient{ + conn: &mockClientConn{}, + addr: "test-addr", + inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}), + } + + err := client.Close() + assert.NoError(t, err) +} + +func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + streamChan := make(chan *streamWriteJob, 1) + + jobCtx, jobCancel := context.WithCancel(context.Background()) + job := &streamWriteJob{ + ctx: jobCtx, + cancel: jobCancel, + } + streamChan <- job + + client := &closableHealthAndIngesterClient{ + conn: &mockClientConn{}, + addr: "test-addr", + inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}), + streamCtx: ctx, + streamCancel: cancel, + streamPushChan: streamChan, + } + + err := client.Close() + assert.NoError(t, err) + + // Verify stream channel is closed + _, ok := <-client.streamPushChan + assert.False(t, ok, "stream channel should be closed") + + // Verify context is cancelled + select { + case <-client.streamCtx.Done(): + // Success - context was cancelled + case <-time.After(100 * time.Millisecond): + t.Error("stream context was not cancelled") + } +} + +func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + streamChan := make(chan *streamWriteJob, 2) + + job1Cancelled := false + job2Cancelled := false + + job1 := &streamWriteJob{ + ctx: context.Background(), + cancel: func() { + job1Cancelled = true + }, + } + job2 := &streamWriteJob{ + ctx: context.Background(), + cancel: func() { + job2Cancelled = true + }, + } + streamChan <- job1 + streamChan <- job2 + + client := &closableHealthAndIngesterClient{ + conn: &mockClientConn{}, + addr: "test-addr", + inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}), + streamCtx: ctx, + streamCancel: cancel, + streamPushChan: streamChan, + } + + err := client.Close() + assert.NoError(t, err) + + _, ok := <-client.streamPushChan + assert.False(t, ok, "stream channel should be closed") + + select { + case <-client.streamCtx.Done(): + case <-time.After(500 * time.Millisecond): + t.Error("stream context was not cancelled") + } + + // Verify jobs were cancelled + assert.True(t, job1Cancelled, "job1 should have been cancelled") + assert.True(t, job2Cancelled, "job2 should have been cancelled") +} diff --git a/pkg/ingester/client/cortex_mock_test.go b/pkg/ingester/client/cortex_mock_test.go index fd98c77082..e9e493a020 100644 --- a/pkg/ingester/client/cortex_mock_test.go +++ b/pkg/ingester/client/cortex_mock_test.go @@ -17,6 +17,11 @@ func (m *IngesterServerMock) Push(ctx context.Context, r *cortexpb.WriteRequest) return args.Get(0).(*cortexpb.WriteResponse), args.Error(1) } +func (m *IngesterServerMock) PushStream(srv Ingester_PushStreamServer) error { + args := m.Called(srv) + return args.Error(0) +} + func (m *IngesterServerMock) Query(ctx context.Context, r *QueryRequest) (*QueryResponse, error) { args := m.Called(ctx, r) return args.Get(0).(*QueryResponse), args.Error(1) diff --git a/pkg/ingester/client/ingester.pb.go b/pkg/ingester/client/ingester.pb.go index ae8937d9ed..cf23254f8d 100644 --- a/pkg/ingester/client/ingester.pb.go +++ b/pkg/ingester/client/ingester.pb.go @@ -1508,93 +1508,94 @@ func init() { func init() { proto.RegisterFile("ingester.proto", fileDescriptor_60f6df4f3586b478) } var fileDescriptor_60f6df4f3586b478 = []byte{ - // 1369 bytes of a gzipped FileDescriptorProto + // 1386 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x58, 0x4b, 0x6f, 0x14, 0xc7, - 0x13, 0xdf, 0xf1, 0x3e, 0xec, 0xad, 0x7d, 0xb0, 0x6e, 0x1b, 0xbc, 0x0c, 0x7f, 0xc6, 0x30, 0x88, - 0x7f, 0xac, 0x24, 0xd8, 0xe0, 0x24, 0x12, 0xe4, 0x85, 0x6c, 0x30, 0x60, 0xc0, 0x18, 0xc6, 0x86, - 0x44, 0x51, 0xa2, 0xd1, 0x78, 0xb7, 0xb1, 0x27, 0xcc, 0x8b, 0xe9, 0x5e, 0x04, 0x39, 0x25, 0xca, - 0x07, 0x48, 0x8e, 0xb9, 0xe6, 0x96, 0x0f, 0x90, 0x0f, 0xc1, 0x91, 0x43, 0x0e, 0x28, 0x07, 0x14, - 0x16, 0x29, 0xca, 0x91, 0x7c, 0x83, 0x68, 0xfa, 0x31, 0x2f, 0x8f, 0x1f, 0x44, 0x90, 0xdb, 0x74, - 0xd5, 0xaf, 0xaa, 0xab, 0x7e, 0x5d, 0xdd, 0x55, 0xbb, 0xd0, 0xb6, 0xbd, 0x4d, 0x4c, 0x28, 0x0e, - 0x67, 0x83, 0xd0, 0xa7, 0x3e, 0xaa, 0xf5, 0xfc, 0x90, 0xe2, 0x87, 0xea, 0xe4, 0xa6, 0xbf, 0xe9, - 0x33, 0xd1, 0x5c, 0xf4, 0xc5, 0xb5, 0xea, 0xb9, 0x4d, 0x9b, 0x6e, 0x0d, 0x36, 0x66, 0x7b, 0xbe, - 0x3b, 0xc7, 0x81, 0x41, 0xe8, 0x7f, 0x8d, 0x7b, 0x54, 0xac, 0xe6, 0x82, 0x7b, 0x9b, 0x52, 0xb1, - 0x21, 0x3e, 0xb8, 0xa9, 0xfe, 0x09, 0x34, 0x0c, 0x6c, 0xf5, 0x0d, 0x7c, 0x7f, 0x80, 0x09, 0x45, - 0xb3, 0x30, 0x7a, 0x7f, 0x80, 0x43, 0x1b, 0x93, 0xae, 0x72, 0xac, 0x3c, 0xd3, 0x98, 0x9f, 0x9c, - 0x15, 0xf0, 0x5b, 0x03, 0x1c, 0x3e, 0x12, 0x30, 0x43, 0x82, 0xf4, 0xf3, 0xd0, 0xe4, 0xe6, 0x24, - 0xf0, 0x3d, 0x82, 0xd1, 0x1c, 0x8c, 0x86, 0x98, 0x0c, 0x1c, 0x2a, 0xed, 0x0f, 0xe6, 0xec, 0x39, - 0xce, 0x90, 0x28, 0xfd, 0x1a, 0xb4, 0x32, 0x1a, 0xf4, 0x21, 0x00, 0xb5, 0x5d, 0x4c, 0x8a, 0x82, - 0x08, 0x36, 0x66, 0xd7, 0x6d, 0x17, 0xaf, 0x31, 0xdd, 0x62, 0xe5, 0xf1, 0xb3, 0xe9, 0x92, 0x91, - 0x42, 0xeb, 0x3f, 0x29, 0xd0, 0x4c, 0xc7, 0x89, 0xde, 0x05, 0x44, 0xa8, 0x15, 0x52, 0x93, 0x81, - 0xa8, 0xe5, 0x06, 0xa6, 0x1b, 0x39, 0x55, 0x66, 0xca, 0x46, 0x87, 0x69, 0xd6, 0xa5, 0x62, 0x85, - 0xa0, 0x19, 0xe8, 0x60, 0xaf, 0x9f, 0xc5, 0x8e, 0x30, 0x6c, 0x1b, 0x7b, 0xfd, 0x34, 0xf2, 0x34, - 0x8c, 0xb9, 0x16, 0xed, 0x6d, 0xe1, 0x90, 0x74, 0xcb, 0x59, 0x9e, 0xae, 0x5b, 0x1b, 0xd8, 0x59, - 0xe1, 0x4a, 0x23, 0x46, 0xe9, 0x3f, 0x2b, 0x30, 0xb9, 0xf4, 0x10, 0xbb, 0x81, 0x63, 0x85, 0xff, - 0x49, 0x88, 0x67, 0xb6, 0x85, 0x78, 0xb0, 0x28, 0x44, 0x92, 0x8a, 0xf1, 0x4b, 0x98, 0x60, 0xa1, - 0xad, 0xd1, 0x10, 0x5b, 0x6e, 0x7c, 0x22, 0xe7, 0xa1, 0xd1, 0xdb, 0x1a, 0x78, 0xf7, 0x32, 0x47, - 0x32, 0x25, 0x9d, 0x25, 0x07, 0x72, 0x21, 0x02, 0x89, 0x53, 0x49, 0x5b, 0x5c, 0xad, 0x8c, 0x8d, - 0x74, 0xca, 0xfa, 0x1a, 0x1c, 0xcc, 0x11, 0xf0, 0x1a, 0x4e, 0xfc, 0x37, 0x05, 0x10, 0x4b, 0xe7, - 0x8e, 0xe5, 0x0c, 0x30, 0x91, 0xa4, 0x1e, 0x05, 0x70, 0x22, 0xa9, 0xe9, 0x59, 0x2e, 0x66, 0x64, - 0xd6, 0x8d, 0x3a, 0x93, 0xdc, 0xb0, 0x5c, 0xbc, 0x03, 0xe7, 0x23, 0xaf, 0xc0, 0x79, 0x79, 0x4f, - 0xce, 0x2b, 0xc7, 0x94, 0x7d, 0x70, 0x8e, 0x26, 0xa1, 0xea, 0xd8, 0xae, 0x4d, 0xbb, 0x55, 0xe6, - 0x91, 0x2f, 0xf4, 0xb3, 0x30, 0x91, 0xc9, 0x4a, 0x30, 0x75, 0x1c, 0x9a, 0x3c, 0xad, 0x07, 0x4c, - 0xce, 0xb8, 0xaa, 0x1b, 0x0d, 0x27, 0x81, 0xea, 0x9f, 0xc2, 0xe1, 0x94, 0x65, 0xee, 0x24, 0xf7, - 0x61, 0xff, 0xab, 0x02, 0xe3, 0xd7, 0x25, 0x51, 0xe4, 0x4d, 0x17, 0x69, 0x9c, 0x7d, 0x39, 0x95, - 0xfd, 0xbf, 0xa0, 0x51, 0xff, 0x40, 0x94, 0x81, 0x88, 0x5a, 0xe4, 0x3b, 0x0d, 0x8d, 0xa4, 0x0c, - 0x64, 0xba, 0x10, 0xd7, 0x01, 0xd1, 0x3f, 0x82, 0x6e, 0x62, 0x96, 0x23, 0x6b, 0x4f, 0x63, 0x04, - 0x9d, 0xdb, 0x04, 0x87, 0x6b, 0xd4, 0xa2, 0x92, 0x28, 0xfd, 0xbb, 0x11, 0x18, 0x4f, 0x09, 0x85, - 0xab, 0x93, 0xf2, 0x3d, 0xb7, 0x7d, 0xcf, 0x0c, 0x2d, 0xca, 0x4b, 0x52, 0x31, 0x5a, 0xb1, 0xd4, - 0xb0, 0x28, 0x8e, 0xaa, 0xd6, 0x1b, 0xb8, 0xa6, 0xb8, 0x08, 0x11, 0x63, 0x15, 0xa3, 0xee, 0x0d, - 0x5c, 0x5e, 0xfd, 0xd1, 0x21, 0x58, 0x81, 0x6d, 0xe6, 0x3c, 0x95, 0x99, 0xa7, 0x8e, 0x15, 0xd8, - 0xcb, 0x19, 0x67, 0xb3, 0x30, 0x11, 0x0e, 0x1c, 0x9c, 0x87, 0x57, 0x18, 0x7c, 0x3c, 0x52, 0x65, - 0xf1, 0x27, 0xa0, 0x65, 0xf5, 0xa8, 0xfd, 0x00, 0xcb, 0xfd, 0xab, 0x6c, 0xff, 0x26, 0x17, 0x8a, - 0x10, 0x4e, 0x40, 0xcb, 0xf1, 0xad, 0x3e, 0xee, 0x9b, 0x1b, 0x8e, 0xdf, 0xbb, 0x47, 0xba, 0x35, - 0x0e, 0xe2, 0xc2, 0x45, 0x26, 0xd3, 0xbf, 0x82, 0x89, 0x88, 0x82, 0xe5, 0x8b, 0x59, 0x12, 0xa6, - 0x60, 0x74, 0x40, 0x70, 0x68, 0xda, 0x7d, 0x71, 0x21, 0x6b, 0xd1, 0x72, 0xb9, 0x8f, 0x4e, 0x41, - 0xa5, 0x6f, 0x51, 0x8b, 0x25, 0xdc, 0x98, 0x3f, 0x2c, 0x8f, 0x7a, 0x1b, 0x8d, 0x06, 0x83, 0xe9, - 0x97, 0x01, 0x45, 0x2a, 0x92, 0xf5, 0x7e, 0x06, 0xaa, 0x24, 0x12, 0x88, 0xf7, 0xe3, 0x48, 0xda, - 0x4b, 0x2e, 0x12, 0x83, 0x23, 0xf5, 0xc7, 0x0a, 0x68, 0x2b, 0x98, 0x86, 0x76, 0x8f, 0x5c, 0xf2, - 0xc3, 0x6c, 0x65, 0xbd, 0xe1, 0xba, 0x3f, 0x0b, 0x4d, 0x59, 0xba, 0x26, 0xc1, 0x74, 0xf7, 0x07, - 0xba, 0x21, 0xa1, 0x6b, 0x98, 0x26, 0x37, 0xa6, 0x92, 0x7e, 0x2f, 0xae, 0xc1, 0xf4, 0x8e, 0x99, - 0x08, 0x82, 0x66, 0xa0, 0xe6, 0x32, 0x88, 0x60, 0xa8, 0x93, 0xbc, 0xb0, 0xdc, 0xd4, 0x10, 0x7a, - 0xfd, 0x16, 0x9c, 0xdc, 0xc1, 0x59, 0xee, 0x86, 0xec, 0xdf, 0x65, 0x00, 0x87, 0x84, 0xcb, 0x15, - 0x4c, 0xad, 0xe8, 0x18, 0x25, 0xc3, 0x71, 0x3e, 0x4a, 0xfa, 0x05, 0x98, 0x81, 0x0e, 0xfb, 0x30, - 0x03, 0x1c, 0x9a, 0x62, 0x0f, 0xc1, 0x24, 0x93, 0xdf, 0xc4, 0x21, 0xf7, 0x87, 0x0e, 0xc5, 0x31, - 0x94, 0x79, 0x51, 0x89, 0x1d, 0x57, 0x61, 0x6a, 0xdb, 0x8e, 0x22, 0xec, 0xf7, 0x61, 0xcc, 0x15, - 0x32, 0x11, 0x78, 0x37, 0x1f, 0x78, 0x6c, 0x13, 0x23, 0xf5, 0xbf, 0x15, 0x38, 0x90, 0xeb, 0x75, - 0x51, 0x98, 0x77, 0x43, 0xdf, 0x35, 0xe5, 0xb0, 0x96, 0xd4, 0x76, 0x3b, 0x92, 0x2f, 0x0b, 0xf1, - 0x72, 0x3f, 0x5d, 0xfc, 0x23, 0x99, 0xe2, 0xf7, 0xa0, 0xc6, 0x9e, 0x14, 0xd9, 0xa4, 0x27, 0x92, - 0x50, 0x18, 0xf5, 0x37, 0x2d, 0x3b, 0x5c, 0x5c, 0x88, 0xfa, 0xde, 0xef, 0xcf, 0xa6, 0x5f, 0x69, - 0xce, 0xe3, 0xf6, 0x0b, 0x7d, 0x2b, 0xa0, 0x38, 0x34, 0xc4, 0x2e, 0xe8, 0x1d, 0xa8, 0xf1, 0xd6, - 0xdc, 0xad, 0xb0, 0xfd, 0x5a, 0xb2, 0xe6, 0xd2, 0xdd, 0x5b, 0x40, 0xf4, 0x1f, 0x14, 0xa8, 0xf2, - 0x4c, 0xdf, 0xd4, 0x45, 0x50, 0x61, 0x0c, 0x7b, 0x3d, 0xbf, 0x6f, 0x7b, 0x9b, 0xec, 0x00, 0xab, - 0x46, 0xbc, 0x46, 0x48, 0xbc, 0x0b, 0x51, 0xa5, 0x37, 0xc5, 0xe5, 0x5f, 0x80, 0x56, 0xa6, 0x22, - 0x33, 0x93, 0x98, 0xb2, 0xaf, 0x49, 0xcc, 0x84, 0x66, 0x5a, 0x83, 0x4e, 0x42, 0x85, 0x3e, 0x0a, - 0xf8, 0x93, 0xdc, 0x9e, 0x1f, 0x97, 0xd6, 0x4c, 0xbd, 0xfe, 0x28, 0xc0, 0x06, 0x53, 0x47, 0xd1, - 0xb0, 0x61, 0x82, 0x1f, 0x1f, 0xfb, 0x8e, 0x8a, 0x97, 0x75, 0x52, 0x51, 0x7b, 0x7c, 0xa1, 0x7f, - 0xaf, 0x40, 0x3b, 0xa9, 0x94, 0x4b, 0xb6, 0x83, 0x5f, 0x47, 0xa1, 0xa8, 0x30, 0x76, 0xd7, 0x76, - 0x30, 0x8b, 0x81, 0x6f, 0x17, 0xaf, 0x8b, 0x98, 0x7a, 0xfb, 0x2a, 0xd4, 0xe3, 0x14, 0x50, 0x1d, - 0xaa, 0x4b, 0xb7, 0x6e, 0x2f, 0x5c, 0xef, 0x94, 0x50, 0x0b, 0xea, 0x37, 0x56, 0xd7, 0x4d, 0xbe, - 0x54, 0xd0, 0x01, 0x68, 0x18, 0x4b, 0x97, 0x97, 0x3e, 0x37, 0x57, 0x16, 0xd6, 0x2f, 0x5c, 0xe9, - 0x8c, 0x20, 0x04, 0x6d, 0x2e, 0xb8, 0xb1, 0x2a, 0x64, 0xe5, 0xf9, 0x3f, 0x47, 0x61, 0x4c, 0xc6, - 0x88, 0xce, 0x41, 0xe5, 0xe6, 0x80, 0x6c, 0xa1, 0x43, 0x49, 0xa5, 0x7e, 0x16, 0xda, 0x14, 0x8b, - 0x1b, 0xad, 0x4e, 0x6d, 0x93, 0xf3, 0x7b, 0xa7, 0x97, 0xd0, 0x45, 0x68, 0xa4, 0x06, 0x4c, 0x54, - 0xf8, 0xdb, 0x42, 0x3d, 0x92, 0x91, 0x66, 0x9f, 0x1c, 0xbd, 0x74, 0x5a, 0x41, 0xab, 0xd0, 0x66, - 0x2a, 0x39, 0x4d, 0x12, 0xf4, 0x3f, 0x69, 0x52, 0x34, 0x61, 0xab, 0x47, 0x77, 0xd0, 0xc6, 0x61, - 0x5d, 0x81, 0x46, 0x6a, 0x66, 0x42, 0x6a, 0xa6, 0x80, 0x32, 0x83, 0x65, 0x12, 0x5c, 0xc1, 0x78, - 0xa6, 0x97, 0xd0, 0x1d, 0x31, 0x3c, 0xa5, 0xa7, 0xaf, 0x5d, 0xfd, 0x1d, 0x2f, 0xd0, 0x15, 0xa4, - 0xbc, 0x04, 0x90, 0xcc, 0x29, 0xe8, 0x70, 0xc6, 0x28, 0x3d, 0xa8, 0xa9, 0x6a, 0x91, 0x2a, 0x0e, - 0x6f, 0x0d, 0x3a, 0xf9, 0x71, 0x67, 0x37, 0x67, 0xc7, 0xb6, 0xab, 0x0a, 0x62, 0x5b, 0x84, 0x7a, - 0xdc, 0xaa, 0x51, 0xb7, 0xa0, 0x7b, 0x73, 0x67, 0x3b, 0xf7, 0x75, 0xbd, 0x84, 0x2e, 0x41, 0x73, - 0xc1, 0x71, 0xf6, 0xe3, 0x46, 0x4d, 0x6b, 0x48, 0xde, 0x8f, 0x13, 0xbf, 0xfa, 0xf9, 0xd6, 0x85, - 0xfe, 0x1f, 0x5f, 0xec, 0x5d, 0x5b, 0xbe, 0xfa, 0xd6, 0x9e, 0xb8, 0x78, 0xb7, 0x6f, 0xe0, 0xe8, - 0xae, 0x8d, 0x72, 0xdf, 0x7b, 0x9e, 0xda, 0x03, 0x57, 0xc0, 0xfa, 0x3a, 0x1c, 0xc8, 0xf5, 0x37, - 0xa4, 0xe5, 0xbc, 0xe4, 0x5a, 0xad, 0x3a, 0xbd, 0xa3, 0x5e, 0xfa, 0x5d, 0xfc, 0xf8, 0xc9, 0x73, - 0xad, 0xf4, 0xf4, 0xb9, 0x56, 0x7a, 0xf9, 0x5c, 0x53, 0xbe, 0x1d, 0x6a, 0xca, 0x2f, 0x43, 0x4d, - 0x79, 0x3c, 0xd4, 0x94, 0x27, 0x43, 0x4d, 0xf9, 0x63, 0xa8, 0x29, 0x7f, 0x0d, 0xb5, 0xd2, 0xcb, - 0xa1, 0xa6, 0xfc, 0xf8, 0x42, 0x2b, 0x3d, 0x79, 0xa1, 0x95, 0x9e, 0xbe, 0xd0, 0x4a, 0x5f, 0xd4, - 0x7a, 0x8e, 0x8d, 0x3d, 0xba, 0x51, 0x63, 0x7f, 0x29, 0xbc, 0xf7, 0x4f, 0x00, 0x00, 0x00, 0xff, - 0xff, 0x73, 0x37, 0x7c, 0x02, 0xbd, 0x10, 0x00, 0x00, + 0x13, 0xdf, 0xf6, 0x3e, 0xd8, 0xad, 0x7d, 0xb0, 0x6e, 0x1b, 0xbc, 0x0c, 0x7f, 0xc6, 0x30, 0x88, + 0x7f, 0xac, 0x24, 0xac, 0xc1, 0x49, 0x24, 0xc8, 0x0b, 0xd9, 0x60, 0xc0, 0x80, 0x31, 0x8c, 0x0d, + 0x89, 0xa2, 0x44, 0xa3, 0xf1, 0x6e, 0x63, 0x4f, 0x98, 0x17, 0x33, 0xbd, 0x08, 0x72, 0x4a, 0x94, + 0x0f, 0x90, 0x1c, 0x73, 0xcd, 0x2d, 0xd7, 0x48, 0xf9, 0x10, 0x1c, 0x39, 0xe4, 0x80, 0x72, 0x40, + 0x61, 0xb9, 0xe4, 0x48, 0xbe, 0x41, 0x34, 0xdd, 0x3d, 0x4f, 0xcf, 0xda, 0x4b, 0x84, 0x73, 0x9b, + 0xae, 0xfa, 0x55, 0x75, 0xd5, 0xaf, 0xab, 0xbb, 0x6a, 0x17, 0x5a, 0x86, 0xbd, 0x45, 0x7c, 0x4a, + 0xbc, 0xae, 0xeb, 0x39, 0xd4, 0xc1, 0x95, 0x9e, 0xe3, 0x51, 0xf2, 0x48, 0x9a, 0xde, 0x72, 0xb6, + 0x1c, 0x26, 0x9a, 0x0f, 0xbe, 0xb8, 0x56, 0x3a, 0xbf, 0x65, 0xd0, 0xed, 0xc1, 0x66, 0xb7, 0xe7, + 0x58, 0xf3, 0x1c, 0xe8, 0x7a, 0xce, 0xd7, 0xa4, 0x47, 0xc5, 0x6a, 0xde, 0xbd, 0xbf, 0x15, 0x2a, + 0x36, 0xc5, 0x07, 0x37, 0x55, 0x3e, 0x81, 0xba, 0x4a, 0xf4, 0xbe, 0x4a, 0x1e, 0x0c, 0x88, 0x4f, + 0x71, 0x17, 0x0e, 0x3c, 0x18, 0x10, 0xcf, 0x20, 0x7e, 0x07, 0x1d, 0x2f, 0xce, 0xd5, 0x17, 0xa6, + 0xbb, 0x02, 0x7e, 0x7b, 0x40, 0xbc, 0xc7, 0x02, 0xa6, 0x86, 0x20, 0xe5, 0x02, 0x34, 0xb8, 0xb9, + 0xef, 0x3a, 0xb6, 0x4f, 0xf0, 0x3c, 0x1c, 0xf0, 0x88, 0x3f, 0x30, 0x69, 0x68, 0x7f, 0x28, 0x63, + 0xcf, 0x71, 0x6a, 0x88, 0x52, 0xae, 0x43, 0x33, 0xa5, 0xc1, 0x1f, 0x02, 0x50, 0xc3, 0x22, 0x7e, + 0x5e, 0x10, 0xee, 0x66, 0x77, 0xc3, 0xb0, 0xc8, 0x3a, 0xd3, 0x2d, 0x95, 0x9e, 0x3c, 0x9f, 0x2d, + 0xa8, 0x09, 0xb4, 0xf2, 0x13, 0x82, 0x46, 0x32, 0x4e, 0xfc, 0x2e, 0x60, 0x9f, 0xea, 0x1e, 0xd5, + 0x18, 0x88, 0xea, 0x96, 0xab, 0x59, 0x81, 0x53, 0x34, 0x57, 0x54, 0xdb, 0x4c, 0xb3, 0x11, 0x2a, + 0x56, 0x7d, 0x3c, 0x07, 0x6d, 0x62, 0xf7, 0xd3, 0xd8, 0x09, 0x86, 0x6d, 0x11, 0xbb, 0x9f, 0x44, + 0x9e, 0x81, 0xaa, 0xa5, 0xd3, 0xde, 0x36, 0xf1, 0xfc, 0x4e, 0x31, 0xcd, 0xd3, 0x0d, 0x7d, 0x93, + 0x98, 0xab, 0x5c, 0xa9, 0x46, 0x28, 0xe5, 0x67, 0x04, 0xd3, 0xcb, 0x8f, 0x88, 0xe5, 0x9a, 0xba, + 0xf7, 0x9f, 0x84, 0x78, 0x76, 0x47, 0x88, 0x87, 0xf2, 0x42, 0xf4, 0x13, 0x31, 0x7e, 0x09, 0x53, + 0x2c, 0xb4, 0x75, 0xea, 0x11, 0xdd, 0x8a, 0x4e, 0xe4, 0x02, 0xd4, 0x7b, 0xdb, 0x03, 0xfb, 0x7e, + 0xea, 0x48, 0x66, 0x42, 0x67, 0xf1, 0x81, 0x5c, 0x0c, 0x40, 0xe2, 0x54, 0x92, 0x16, 0xd7, 0x4a, + 0xd5, 0x89, 0x76, 0x51, 0x59, 0x87, 0x43, 0x19, 0x02, 0xde, 0xc0, 0x89, 0xff, 0x8e, 0x00, 0xb3, + 0x74, 0xee, 0xea, 0xe6, 0x80, 0xf8, 0x21, 0xa9, 0xc7, 0x00, 0xcc, 0x40, 0xaa, 0xd9, 0xba, 0x45, + 0x18, 0x99, 0x35, 0xb5, 0xc6, 0x24, 0x37, 0x75, 0x8b, 0x8c, 0xe0, 0x7c, 0xe2, 0x35, 0x38, 0x2f, + 0xee, 0xc9, 0x79, 0xe9, 0x38, 0x1a, 0x83, 0x73, 0x3c, 0x0d, 0x65, 0xd3, 0xb0, 0x0c, 0xda, 0x29, + 0x33, 0x8f, 0x7c, 0xa1, 0x9c, 0x83, 0xa9, 0x54, 0x56, 0x82, 0xa9, 0x13, 0xd0, 0xe0, 0x69, 0x3d, + 0x64, 0x72, 0xc6, 0x55, 0x4d, 0xad, 0x9b, 0x31, 0x54, 0xf9, 0x14, 0x8e, 0x24, 0x2c, 0x33, 0x27, + 0x39, 0x86, 0xfd, 0x6f, 0x08, 0x26, 0x6f, 0x84, 0x44, 0xf9, 0xfb, 0x5d, 0xa4, 0x51, 0xf6, 0xc5, + 0x44, 0xf6, 0xff, 0x82, 0x46, 0xe5, 0x03, 0x51, 0x06, 0x22, 0x6a, 0x91, 0xef, 0x2c, 0xd4, 0xe3, + 0x32, 0x08, 0xd3, 0x85, 0xa8, 0x0e, 0x7c, 0xe5, 0x23, 0xe8, 0xc4, 0x66, 0x19, 0xb2, 0xf6, 0x34, + 0xc6, 0xd0, 0xbe, 0xe3, 0x13, 0x6f, 0x9d, 0xea, 0x34, 0x24, 0x4a, 0xf9, 0x6e, 0x02, 0x26, 0x13, + 0x42, 0xe1, 0xea, 0x54, 0xf8, 0x9e, 0x1b, 0x8e, 0xad, 0x79, 0x3a, 0xe5, 0x25, 0x89, 0xd4, 0x66, + 0x24, 0x55, 0x75, 0x4a, 0x82, 0xaa, 0xb5, 0x07, 0x96, 0x26, 0x2e, 0x42, 0xc0, 0x58, 0x49, 0xad, + 0xd9, 0x03, 0x8b, 0x57, 0x7f, 0x70, 0x08, 0xba, 0x6b, 0x68, 0x19, 0x4f, 0x45, 0xe6, 0xa9, 0xad, + 0xbb, 0xc6, 0x4a, 0xca, 0x59, 0x17, 0xa6, 0xbc, 0x81, 0x49, 0xb2, 0xf0, 0x12, 0x83, 0x4f, 0x06, + 0xaa, 0x34, 0xfe, 0x24, 0x34, 0xf5, 0x1e, 0x35, 0x1e, 0x92, 0x70, 0xff, 0x32, 0xdb, 0xbf, 0xc1, + 0x85, 0x22, 0x84, 0x93, 0xd0, 0x34, 0x1d, 0xbd, 0x4f, 0xfa, 0xda, 0xa6, 0xe9, 0xf4, 0xee, 0xfb, + 0x9d, 0x0a, 0x07, 0x71, 0xe1, 0x12, 0x93, 0x29, 0x5f, 0xc1, 0x54, 0x40, 0xc1, 0xca, 0xa5, 0x34, + 0x09, 0x33, 0x70, 0x60, 0xe0, 0x13, 0x4f, 0x33, 0xfa, 0xe2, 0x42, 0x56, 0x82, 0xe5, 0x4a, 0x1f, + 0x9f, 0x86, 0x52, 0x5f, 0xa7, 0x3a, 0x4b, 0xb8, 0xbe, 0x70, 0x24, 0x3c, 0xea, 0x1d, 0x34, 0xaa, + 0x0c, 0xa6, 0x5c, 0x01, 0x1c, 0xa8, 0xfc, 0xb4, 0xf7, 0xb3, 0x50, 0xf6, 0x03, 0x81, 0x78, 0x3f, + 0x8e, 0x26, 0xbd, 0x64, 0x22, 0x51, 0x39, 0x52, 0x79, 0x82, 0x40, 0x5e, 0x25, 0xd4, 0x33, 0x7a, + 0xfe, 0x65, 0xc7, 0x4b, 0x57, 0xd6, 0x3e, 0xd7, 0xfd, 0x39, 0x68, 0x84, 0xa5, 0xab, 0xf9, 0x84, + 0xee, 0xfe, 0x40, 0xd7, 0x43, 0xe8, 0x3a, 0xa1, 0xf1, 0x8d, 0x29, 0x25, 0xdf, 0x8b, 0xeb, 0x30, + 0x3b, 0x32, 0x13, 0x41, 0xd0, 0x1c, 0x54, 0x2c, 0x06, 0x11, 0x0c, 0xb5, 0xe3, 0x17, 0x96, 0x9b, + 0xaa, 0x42, 0xaf, 0xdc, 0x86, 0x53, 0x23, 0x9c, 0x65, 0x6e, 0xc8, 0xf8, 0x2e, 0x5d, 0x38, 0x2c, + 0x5c, 0xae, 0x12, 0xaa, 0x07, 0xc7, 0x18, 0x32, 0x1c, 0xe5, 0x83, 0x92, 0x2f, 0xc0, 0x1c, 0xb4, + 0xd9, 0x87, 0xe6, 0x12, 0x4f, 0x13, 0x7b, 0x08, 0x26, 0x99, 0xfc, 0x16, 0xf1, 0xb8, 0x3f, 0x7c, + 0x38, 0x8a, 0xa1, 0xc8, 0x8b, 0x4a, 0xec, 0xb8, 0x06, 0x33, 0x3b, 0x76, 0x14, 0x61, 0xbf, 0x0f, + 0x55, 0x4b, 0xc8, 0x44, 0xe0, 0x9d, 0x6c, 0xe0, 0x91, 0x4d, 0x84, 0x54, 0xfe, 0x46, 0x70, 0x30, + 0xd3, 0xeb, 0x82, 0x30, 0xef, 0x79, 0x8e, 0xa5, 0x85, 0xc3, 0x5a, 0x5c, 0xdb, 0xad, 0x40, 0xbe, + 0x22, 0xc4, 0x2b, 0xfd, 0x64, 0xf1, 0x4f, 0xa4, 0x8a, 0xdf, 0x86, 0x0a, 0x7b, 0x52, 0xc2, 0x26, + 0x3d, 0x15, 0x87, 0xc2, 0xa8, 0xbf, 0xa5, 0x1b, 0xde, 0xd2, 0x62, 0xd0, 0xf7, 0xfe, 0x78, 0x3e, + 0xfb, 0x5a, 0x73, 0x1e, 0xb7, 0x5f, 0xec, 0xeb, 0x2e, 0x25, 0x9e, 0x2a, 0x76, 0xc1, 0xef, 0x40, + 0x85, 0xb7, 0xe6, 0x4e, 0x89, 0xed, 0xd7, 0x0c, 0x6b, 0x2e, 0xd9, 0xbd, 0x05, 0x44, 0xf9, 0x01, + 0x41, 0x99, 0x67, 0xba, 0x5f, 0x17, 0x41, 0x82, 0x2a, 0xb1, 0x7b, 0x4e, 0xdf, 0xb0, 0xb7, 0xd8, + 0x01, 0x96, 0xd5, 0x68, 0x8d, 0xb1, 0x78, 0x17, 0x82, 0x4a, 0x6f, 0x88, 0xcb, 0xbf, 0x08, 0xcd, + 0x54, 0x45, 0xa6, 0x26, 0x31, 0x34, 0xd6, 0x24, 0xa6, 0x41, 0x23, 0xa9, 0xc1, 0xa7, 0xa0, 0x44, + 0x1f, 0xbb, 0xfc, 0x49, 0x6e, 0x2d, 0x4c, 0x86, 0xd6, 0x4c, 0xbd, 0xf1, 0xd8, 0x25, 0x2a, 0x53, + 0x07, 0xd1, 0xb0, 0x61, 0x82, 0x1f, 0x1f, 0xfb, 0x0e, 0x8a, 0x97, 0x75, 0x52, 0x51, 0x7b, 0x7c, + 0xa1, 0x7c, 0x8f, 0xa0, 0x15, 0x57, 0xca, 0x65, 0xc3, 0x24, 0x6f, 0xa2, 0x50, 0x24, 0xa8, 0xde, + 0x33, 0x4c, 0xc2, 0x62, 0xe0, 0xdb, 0x45, 0xeb, 0x3c, 0xa6, 0xde, 0xbe, 0x06, 0xb5, 0x28, 0x05, + 0x5c, 0x83, 0xf2, 0xf2, 0xed, 0x3b, 0x8b, 0x37, 0xda, 0x05, 0xdc, 0x84, 0xda, 0xcd, 0xb5, 0x0d, + 0x8d, 0x2f, 0x11, 0x3e, 0x08, 0x75, 0x75, 0xf9, 0xca, 0xf2, 0xe7, 0xda, 0xea, 0xe2, 0xc6, 0xc5, + 0xab, 0xed, 0x09, 0x8c, 0xa1, 0xc5, 0x05, 0x37, 0xd7, 0x84, 0xac, 0xb8, 0xf0, 0x6b, 0x15, 0xaa, + 0x61, 0x8c, 0xf8, 0x3c, 0x94, 0x6e, 0x0d, 0xfc, 0x6d, 0x7c, 0x38, 0xae, 0xd4, 0xcf, 0x3c, 0x83, + 0x12, 0x71, 0xa3, 0xa5, 0x99, 0x1d, 0x72, 0x7e, 0xef, 0x94, 0x02, 0x5e, 0x01, 0x08, 0x4c, 0xf9, + 0x33, 0x82, 0xff, 0x17, 0x03, 0xb9, 0x64, 0x4c, 0x37, 0x73, 0xe8, 0x0c, 0xc2, 0x97, 0xa0, 0x9e, + 0x98, 0x55, 0x71, 0xee, 0xcf, 0x14, 0xe9, 0x68, 0x4a, 0x9a, 0x7e, 0xbd, 0x94, 0xc2, 0x19, 0x84, + 0xd7, 0xa0, 0xc5, 0x54, 0xe1, 0x60, 0xea, 0x47, 0x41, 0x75, 0xf3, 0x86, 0x75, 0xe9, 0xd8, 0x08, + 0x6d, 0x94, 0xe1, 0x55, 0xa8, 0x27, 0xc6, 0x2f, 0x2c, 0xa5, 0x6a, 0x31, 0x35, 0xa3, 0xc6, 0xc1, + 0xe5, 0x4c, 0x7a, 0x4a, 0x01, 0xdf, 0x15, 0x73, 0x58, 0x72, 0x90, 0xdb, 0xd5, 0xdf, 0x89, 0x1c, + 0x5d, 0x4e, 0xca, 0xcb, 0x00, 0xf1, 0xc8, 0x83, 0x8f, 0xa4, 0x8c, 0x92, 0x33, 0x9f, 0x24, 0xe5, + 0xa9, 0xa2, 0xf0, 0xd6, 0xa1, 0x9d, 0x9d, 0x9c, 0x76, 0x73, 0x76, 0x7c, 0xa7, 0x2a, 0x27, 0xb6, + 0x25, 0xa8, 0x45, 0x5d, 0x1f, 0x77, 0x72, 0x06, 0x01, 0xee, 0x6c, 0xf4, 0x88, 0xa0, 0x14, 0xf0, + 0x65, 0x68, 0x2c, 0x9a, 0xe6, 0x38, 0x6e, 0xa4, 0xa4, 0xc6, 0xcf, 0xfa, 0x31, 0xa3, 0x06, 0x92, + 0xed, 0x82, 0xf8, 0xff, 0xd1, 0x1b, 0xb1, 0xeb, 0xf4, 0x20, 0xbd, 0xb5, 0x27, 0x2e, 0xda, 0xed, + 0x1b, 0x38, 0xb6, 0x6b, 0xcf, 0x1d, 0x7b, 0xcf, 0xd3, 0x7b, 0xe0, 0x72, 0x58, 0xdf, 0x80, 0x83, + 0x99, 0x56, 0x89, 0xe5, 0x8c, 0x97, 0x4c, 0xd7, 0x96, 0x66, 0x47, 0xea, 0x43, 0xbf, 0x4b, 0x1f, + 0x3f, 0x7d, 0x21, 0x17, 0x9e, 0xbd, 0x90, 0x0b, 0xaf, 0x5e, 0xc8, 0xe8, 0xdb, 0xa1, 0x8c, 0x7e, + 0x19, 0xca, 0xe8, 0xc9, 0x50, 0x46, 0x4f, 0x87, 0x32, 0xfa, 0x73, 0x28, 0xa3, 0xbf, 0x86, 0x72, + 0xe1, 0xd5, 0x50, 0x46, 0x3f, 0xbe, 0x94, 0x0b, 0x4f, 0x5f, 0xca, 0x85, 0x67, 0x2f, 0xe5, 0xc2, + 0x17, 0x95, 0x9e, 0x69, 0x10, 0x9b, 0x6e, 0x56, 0xd8, 0xbf, 0x13, 0xef, 0xfd, 0x13, 0x00, 0x00, + 0xff, 0xff, 0xfe, 0x5e, 0xa0, 0x57, 0x08, 0x11, 0x00, 0x00, } func (x MatchType) String() string { @@ -2817,6 +2818,7 @@ const _ = grpc.SupportPackageIsVersion4 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type IngesterClient interface { Push(ctx context.Context, in *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) + PushStream(ctx context.Context, opts ...grpc.CallOption) (Ingester_PushStreamClient, error) QueryStream(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Ingester_QueryStreamClient, error) QueryExemplars(ctx context.Context, in *ExemplarQueryRequest, opts ...grpc.CallOption) (*ExemplarQueryResponse, error) LabelValues(ctx context.Context, in *LabelValuesRequest, opts ...grpc.CallOption) (*LabelValuesResponse, error) @@ -2847,8 +2849,39 @@ func (c *ingesterClient) Push(ctx context.Context, in *cortexpb.WriteRequest, op return out, nil } +func (c *ingesterClient) PushStream(ctx context.Context, opts ...grpc.CallOption) (Ingester_PushStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[0], "/cortex.Ingester/PushStream", opts...) + if err != nil { + return nil, err + } + x := &ingesterPushStreamClient{stream} + return x, nil +} + +type Ingester_PushStreamClient interface { + Send(*cortexpb.StreamWriteRequest) error + Recv() (*cortexpb.WriteResponse, error) + grpc.ClientStream +} + +type ingesterPushStreamClient struct { + grpc.ClientStream +} + +func (x *ingesterPushStreamClient) Send(m *cortexpb.StreamWriteRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *ingesterPushStreamClient) Recv() (*cortexpb.WriteResponse, error) { + m := new(cortexpb.WriteResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *ingesterClient) QueryStream(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Ingester_QueryStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[0], "/cortex.Ingester/QueryStream", opts...) + stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[1], "/cortex.Ingester/QueryStream", opts...) if err != nil { return nil, err } @@ -2898,7 +2931,7 @@ func (c *ingesterClient) LabelValues(ctx context.Context, in *LabelValuesRequest } func (c *ingesterClient) LabelValuesStream(ctx context.Context, in *LabelValuesRequest, opts ...grpc.CallOption) (Ingester_LabelValuesStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[1], "/cortex.Ingester/LabelValuesStream", opts...) + stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[2], "/cortex.Ingester/LabelValuesStream", opts...) if err != nil { return nil, err } @@ -2939,7 +2972,7 @@ func (c *ingesterClient) LabelNames(ctx context.Context, in *LabelNamesRequest, } func (c *ingesterClient) LabelNamesStream(ctx context.Context, in *LabelNamesRequest, opts ...grpc.CallOption) (Ingester_LabelNamesStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[2], "/cortex.Ingester/LabelNamesStream", opts...) + stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[3], "/cortex.Ingester/LabelNamesStream", opts...) if err != nil { return nil, err } @@ -2998,7 +3031,7 @@ func (c *ingesterClient) MetricsForLabelMatchers(ctx context.Context, in *Metric } func (c *ingesterClient) MetricsForLabelMatchersStream(ctx context.Context, in *MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (Ingester_MetricsForLabelMatchersStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[3], "/cortex.Ingester/MetricsForLabelMatchersStream", opts...) + stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[4], "/cortex.Ingester/MetricsForLabelMatchersStream", opts...) if err != nil { return nil, err } @@ -3041,6 +3074,7 @@ func (c *ingesterClient) MetricsMetadata(ctx context.Context, in *MetricsMetadat // IngesterServer is the server API for Ingester service. type IngesterServer interface { Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) + PushStream(Ingester_PushStreamServer) error QueryStream(*QueryRequest, Ingester_QueryStreamServer) error QueryExemplars(context.Context, *ExemplarQueryRequest) (*ExemplarQueryResponse, error) LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error) @@ -3061,6 +3095,9 @@ type UnimplementedIngesterServer struct { func (*UnimplementedIngesterServer) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Push not implemented") } +func (*UnimplementedIngesterServer) PushStream(srv Ingester_PushStreamServer) error { + return status.Errorf(codes.Unimplemented, "method PushStream not implemented") +} func (*UnimplementedIngesterServer) QueryStream(req *QueryRequest, srv Ingester_QueryStreamServer) error { return status.Errorf(codes.Unimplemented, "method QueryStream not implemented") } @@ -3117,6 +3154,32 @@ func _Ingester_Push_Handler(srv interface{}, ctx context.Context, dec func(inter return interceptor(ctx, in, info, handler) } +func _Ingester_PushStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(IngesterServer).PushStream(&ingesterPushStreamServer{stream}) +} + +type Ingester_PushStreamServer interface { + Send(*cortexpb.WriteResponse) error + Recv() (*cortexpb.StreamWriteRequest, error) + grpc.ServerStream +} + +type ingesterPushStreamServer struct { + grpc.ServerStream +} + +func (x *ingesterPushStreamServer) Send(m *cortexpb.WriteResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *ingesterPushStreamServer) Recv() (*cortexpb.StreamWriteRequest, error) { + m := new(cortexpb.StreamWriteRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func _Ingester_QueryStream_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(QueryRequest) if err := stream.RecvMsg(m); err != nil { @@ -3365,6 +3428,12 @@ var _Ingester_serviceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{ + { + StreamName: "PushStream", + Handler: _Ingester_PushStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, { StreamName: "QueryStream", Handler: _Ingester_QueryStream_Handler, diff --git a/pkg/ingester/client/ingester.proto b/pkg/ingester/client/ingester.proto index 0cbfac49c9..38b3039673 100644 --- a/pkg/ingester/client/ingester.proto +++ b/pkg/ingester/client/ingester.proto @@ -13,6 +13,7 @@ option (gogoproto.unmarshaler_all) = true; service Ingester { rpc Push(cortexpb.WriteRequest) returns (cortexpb.WriteResponse) {}; + rpc PushStream(stream cortexpb.StreamWriteRequest) returns (stream cortexpb.WriteResponse) {}; rpc QueryStream(QueryRequest) returns (stream QueryStreamResponse) {}; rpc QueryExemplars(ExemplarQueryRequest) returns (ExemplarQueryResponse) {}; diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index af618488ef..ec4b4a29f8 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -40,6 +40,7 @@ import ( storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "go.uber.org/atomic" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -141,7 +142,7 @@ type Config struct { IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names"` // For testing, you can override the address and ID of this ingester. - ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error) + ingesterClientFactory func(addr string, cfg client.Config, useStreamConnection bool) (client.HealthAndIngesterClient, error) // For admin contact details AdminLimitMessage string `yaml:"admin_limit_message"` @@ -1531,6 +1532,42 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte return &cortexpb.WriteResponse{}, nil } +func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error { + ctx := srv.Context() + for { + select { + case <-ctx.Done(): + level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "PushStream closed") + return ctx.Err() + default: + } + + req, err := srv.Recv() + + if err == io.EOF { + return nil + } + + if err != nil { + return err + } + ctx = user.InjectOrgID(ctx, req.TenantID) + resp, err := i.Push(ctx, req.Request) + if err != nil { + response, isGRPCError := httpgrpc.HTTPResponseFromError(err) + if !isGRPCError { + err = httpgrpc.Errorf(http.StatusInternalServerError, "%s", err) + response, _ = httpgrpc.HTTPResponseFromError(err) + } + resp.GRPCResponse = response + } + err = srv.Send(resp) + if err != nil { + level.Error(logutil.WithContext(ctx, i.logger)).Log("msg", "error sending from PushStream", "err", err) + } + } +} + func (u *userTSDB) acquireReadLock() error { u.stateMtx.RLock() defer u.stateMtx.RUnlock() diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 0da6a49161..8e3f4a0882 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3579,7 +3579,7 @@ func TestIngester_QueryStream(t *testing.T) { }() // Query back the series using GRPC streaming. - c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig()) + c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig(), false) require.NoError(t, err) defer c.Close() @@ -3670,7 +3670,7 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) { }() // Query back the series using GRPC streaming. - c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig()) + c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig(), false) require.NoError(t, err) defer c.Close()