Skip to content

Commit 0ecf317

Browse files
authored
Option to include quorum zone results for DistributorQueryable metadata API (#5779)
* add zone-results-quorum for metadata APIs Signed-off-by: Ben Ye <[email protected]> * update doc Signed-off-by: Ben Ye <[email protected]> * integration test Signed-off-by: Ben Ye <[email protected]> * fix tests Signed-off-by: Ben Ye <[email protected]> * mark flag hidden Signed-off-by: Ben Ye <[email protected]> * changelog Signed-off-by: Ben Ye <[email protected]> * refactor interfaces Signed-off-by: Ben Ye <[email protected]> * update comment Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent 73c8e5a commit 0ecf317

8 files changed

+363
-61
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
1313
* [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731
1414
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
15+
* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names, values and series, only results from quorum number of zones will be included and merged. #5779
1516
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
1617
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
1718
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684

integration/zone_aware_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,95 @@ func TestZoneAwareReplication(t *testing.T) {
151151
require.Equal(t, 500, res.StatusCode)
152152

153153
}
154+
155+
func TestZoneResultsQuorum(t *testing.T) {
156+
s, err := e2e.NewScenario(networkName)
157+
require.NoError(t, err)
158+
defer s.Close()
159+
160+
flags := BlocksStorageFlags()
161+
flags["-distributor.shard-by-all-labels"] = "true"
162+
flags["-distributor.replication-factor"] = "3"
163+
flags["-distributor.zone-awareness-enabled"] = "true"
164+
165+
// Start dependencies.
166+
consul := e2edb.NewConsul()
167+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
168+
require.NoError(t, s.StartAndWaitReady(consul, minio))
169+
170+
// Start Cortex components.
171+
ingesterFlags := func(zone string) map[string]string {
172+
return mergeFlags(flags, map[string]string{
173+
"-ingester.availability-zone": zone,
174+
})
175+
}
176+
177+
ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
178+
ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
179+
ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
180+
ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
181+
ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
182+
ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
183+
require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6))
184+
185+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
186+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
187+
flagsZoneResultsQuorum := mergeFlags(flags, map[string]string{
188+
"-distributor.zone-results-quorum-metadata": "true",
189+
})
190+
querierZoneResultsQuorum := e2ecortex.NewQuerier("querier-zrq", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsZoneResultsQuorum, "")
191+
require.NoError(t, s.StartAndWaitReady(distributor, querier, querierZoneResultsQuorum))
192+
193+
// Wait until distributor and queriers have updated the ring.
194+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
195+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
196+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
197+
198+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
199+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
200+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
201+
202+
require.NoError(t, querierZoneResultsQuorum.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
203+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
204+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
205+
206+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
207+
require.NoError(t, err)
208+
clientZoneResultsQuorum, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querierZoneResultsQuorum.HTTPEndpoint(), "", "", userID)
209+
require.NoError(t, err)
210+
211+
// Push some series
212+
now := time.Now()
213+
numSeries := 100
214+
expectedVectors := map[string]model.Vector{}
215+
216+
for i := 1; i <= numSeries; i++ {
217+
metricName := fmt.Sprintf("series_%d", i)
218+
series, expectedVector := generateSeries(metricName, now)
219+
res, err := client.Push(series)
220+
require.NoError(t, err)
221+
require.Equal(t, 200, res.StatusCode)
222+
223+
expectedVectors[metricName] = expectedVector
224+
}
225+
226+
start := now.Add(-time.Hour)
227+
end := now.Add(time.Hour)
228+
res1, err := client.LabelNames(start, end)
229+
require.NoError(t, err)
230+
res2, err := clientZoneResultsQuorum.LabelNames(start, end)
231+
require.NoError(t, err)
232+
assert.Equal(t, res1, res2)
233+
234+
values1, err := client.LabelValues(labels.MetricName, start, end, nil)
235+
require.NoError(t, err)
236+
values2, err := clientZoneResultsQuorum.LabelValues(labels.MetricName, start, end, nil)
237+
require.NoError(t, err)
238+
assert.Equal(t, values1, values2)
239+
240+
series1, err := client.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end)
241+
require.NoError(t, err)
242+
series2, err := clientZoneResultsQuorum.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end)
243+
require.NoError(t, err)
244+
assert.Equal(t, series1, series2)
245+
}

pkg/distributor/distributor.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ type Config struct {
145145
// This config is dynamically injected because defined in the querier config.
146146
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`
147147

148+
// ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set
149+
// with metadata APIs (labels names, values and series). When zone awareness is enabled, only results
150+
// from quorum number of zones will be included to reduce data merged and improve performance.
151+
ZoneResultsQuorumMetadata bool `yaml:"zone_results_quorum_metadata" doc:"hidden"`
152+
148153
// Limits for distributor
149154
InstanceLimits InstanceLimits `yaml:"instance_limits"`
150155
}
@@ -167,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
167172
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
168173
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
169174
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
175+
f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names, values and series), only results from quorum number of zones will be included.")
170176

171177
f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
172178
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
@@ -924,8 +930,8 @@ func getErrorStatus(err error) string {
924930
}
925931

926932
// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
927-
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
928-
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
933+
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
934+
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
929935
client, err := d.ingesterPool.GetClientFor(ing.Addr)
930936
if err != nil {
931937
return nil, err
@@ -981,7 +987,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
981987
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
982988
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
983989
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
984-
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
990+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
985991
resp, err := client.LabelValues(ctx, req)
986992
if err != nil {
987993
return nil, err
@@ -994,7 +1000,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
9941000
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
9951001
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
9961002
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
997-
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1003+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
9981004
stream, err := client.LabelValuesStream(ctx, req)
9991005
if err != nil {
10001006
return nil, err
@@ -1059,7 +1065,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
10591065

10601066
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
10611067
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1062-
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1068+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
10631069
stream, err := client.LabelNamesStream(ctx, req)
10641070
if err != nil {
10651071
return nil, err
@@ -1085,7 +1091,7 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time)
10851091
// LabelNames returns all the label names.
10861092
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
10871093
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1088-
return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1094+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
10891095
resp, err := client.LabelNames(ctx, req)
10901096
if err != nil {
10911097
return nil, err
@@ -1098,7 +1104,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
10981104
// MetricsForLabelMatchers gets the metrics that match said matchers
10991105
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
11001106
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1101-
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1107+
_, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11021108
resp, err := client.MetricsForLabelMatchers(ctx, req)
11031109
if err != nil {
11041110
return nil, err
@@ -1127,7 +1133,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
11271133

11281134
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
11291135
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1130-
_, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1136+
_, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11311137
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
11321138
if err != nil {
11331139
return nil, err
@@ -1205,7 +1211,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
12051211

12061212
req := &ingester_client.MetricsMetadataRequest{}
12071213
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
1208-
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1214+
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12091215
return client.MetricsMetadata(ctx, req)
12101216
})
12111217
if err != nil {
@@ -1247,7 +1253,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
12471253
replicationSet.MaxErrors = 0
12481254

12491255
req := &ingester_client.UserStatsRequest{}
1250-
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1256+
resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12511257
return client.UserStats(ctx, req)
12521258
})
12531259
if err != nil {

pkg/distributor/query.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica
161161
func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (model.Matrix, error) {
162162
// Fetch samples from multiple ingesters in parallel, using the replicationSet
163163
// to deal with consistency.
164-
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
164+
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
165165
client, err := d.ingesterPool.GetClientFor(ing.Addr)
166166
if err != nil {
167167
return nil, err
@@ -232,7 +232,7 @@ func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar {
232232
func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) {
233233
// Fetch exemplars from multiple ingesters in parallel, using the replicationSet
234234
// to deal with consistency.
235-
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
235+
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
236236
client, err := d.ingesterPool.GetClientFor(ing.Addr)
237237
if err != nil {
238238
return nil, err
@@ -293,7 +293,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
293293
)
294294

295295
// Fetch samples from multiple ingesters
296-
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
296+
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
297297
client, err := d.ingesterPool.GetClientFor(ing.Addr)
298298
if err != nil {
299299
return nil, err

pkg/ring/replication_set.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,19 @@ type ReplicationSet struct {
2121
}
2222

2323
// Do function f in parallel for all replicas in the set, erroring is we exceed
24-
// MaxErrors and returning early otherwise.
25-
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
24+
// MaxErrors and returning early otherwise. zoneResultsQuorum allows only include
25+
// results from zones that already reach quorum to improve performance.
26+
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
2627
type instanceResult struct {
2728
res interface{}
2829
err error
2930
instance *InstanceDesc
3031
}
3132

32-
// Initialise the result tracker, which is use to keep track of successes and failures.
33+
// Initialise the result tracker, which is used to keep track of successes and failures.
3334
var tracker replicationSetResultTracker
3435
if r.MaxUnavailableZones > 0 {
35-
tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones)
36+
tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones, zoneResultsQuorum)
3637
} else {
3738
tracker = newDefaultResultTracker(r.Instances, r.MaxErrors)
3839
}
@@ -67,12 +68,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
6768
}(i, &r.Instances[i])
6869
}
6970

70-
results := make([]interface{}, 0, len(r.Instances))
71-
7271
for !tracker.succeeded() {
7372
select {
7473
case res := <-ch:
75-
tracker.done(res.instance, res.err)
74+
tracker.done(res.instance, res.res, res.err)
7675
if res.err != nil {
7776
if tracker.failed() {
7877
return nil, res.err
@@ -82,16 +81,14 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
8281
if delay > 0 && r.MaxUnavailableZones == 0 {
8382
forceStart <- struct{}{}
8483
}
85-
} else {
86-
results = append(results, res.res)
8784
}
8885

8986
case <-ctx.Done():
9087
return nil, ctx.Err()
9188
}
9289
}
9390

94-
return results, nil
91+
return tracker.getResults(), nil
9592
}
9693

9794
// Includes returns whether the replication set includes the replica with the provided addr.

0 commit comments

Comments
 (0)