@@ -21,6 +21,7 @@ import (
21
21
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
22
22
"github.com/cortexproject/cortex/integration/e2ecortex"
23
23
"github.com/cortexproject/cortex/pkg/storage/tsdb"
24
+ "github.com/cortexproject/cortex/pkg/util"
24
25
)
25
26
26
27
func TestQuerierWithBlocksStorageRunningInMicroservicesMode (t * testing.T ) {
@@ -124,8 +125,8 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
124
125
// Push some series to Cortex.
125
126
series1Timestamp := time .Now ()
126
127
series2Timestamp := series1Timestamp .Add (blockRangePeriod * 2 )
127
- series1 , expectedVector1 := generateSeries ("series_1" , series1Timestamp )
128
- series2 , expectedVector2 := generateSeries ("series_2" , series2Timestamp )
128
+ series1 , expectedVector1 := generateSeries ("series_1" , series1Timestamp , prompb. Label { Name : "series_1" , Value : "series_1" } )
129
+ series2 , expectedVector2 := generateSeries ("series_2" , series2Timestamp , prompb. Label { Name : "series_2" , Value : "series_2" } )
129
130
130
131
res , err := c .Push (series1 )
131
132
require .NoError (t , err )
@@ -145,7 +146,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
145
146
// Push another series to further compact another block and delete the first block
146
147
// due to expired retention.
147
148
series3Timestamp := series2Timestamp .Add (blockRangePeriod * 2 )
148
- series3 , expectedVector3 := generateSeries ("series_3" , series3Timestamp )
149
+ series3 , expectedVector3 := generateSeries ("series_3" , series3Timestamp , prompb. Label { Name : "series_3" , Value : "series_3" } )
149
150
150
151
res , err = c .Push (series3 )
151
152
require .NoError (t , err )
@@ -219,6 +220,9 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
219
220
require .NoError (t , storeGateways .WaitSumMetrics (e2e .Equals (11 + 2 ), "thanos_memcached_operations_total" )) // as before + 2 gets
220
221
}
221
222
223
+ // Query metadata.
224
+ testMetadataQueriesWithBlocksStorage (t , c , series1 , series2 , series3 , blockRangePeriod )
225
+
222
226
// Ensure no service-specific metrics prefix is used by the wrong service.
223
227
assertServiceMetricsPrefixes (t , Distributor , distributor )
224
228
assertServiceMetricsPrefixes (t , Ingester , ingester )
@@ -323,8 +327,8 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
323
327
// Push some series to Cortex.
324
328
series1Timestamp := time .Now ()
325
329
series2Timestamp := series1Timestamp .Add (blockRangePeriod * 2 )
326
- series1 , expectedVector1 := generateSeries ("series_1" , series1Timestamp )
327
- series2 , expectedVector2 := generateSeries ("series_2" , series2Timestamp )
330
+ series1 , expectedVector1 := generateSeries ("series_1" , series1Timestamp , prompb. Label { Name : "series_1" , Value : "series_1" } )
331
+ series2 , expectedVector2 := generateSeries ("series_2" , series2Timestamp , prompb. Label { Name : "series_2" , Value : "series_2" } )
328
332
329
333
res , err := c .Push (series1 )
330
334
require .NoError (t , err )
@@ -344,7 +348,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
344
348
// Push another series to further compact another block and delete the first block
345
349
// due to expired retention.
346
350
series3Timestamp := series2Timestamp .Add (blockRangePeriod * 2 )
347
- series3 , expectedVector3 := generateSeries ("series_3" , series3Timestamp )
351
+ series3 , expectedVector3 := generateSeries ("series_3" , series3Timestamp , prompb. Label { Name : "series_3" , Value : "series_3" } )
348
352
349
353
res , err = c .Push (series3 )
350
354
require .NoError (t , err )
@@ -410,115 +414,21 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
410
414
} else if testCfg .indexCacheBackend == tsdb .IndexCacheBackendMemcached {
411
415
require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 ((11 + 2 )* seriesReplicationFactor )), "thanos_memcached_operations_total" )) // as before + 2 gets
412
416
}
417
+
418
+ // Query metadata.
419
+ testMetadataQueriesWithBlocksStorage (t , c , series1 , series2 , series3 , blockRangePeriod )
413
420
})
414
421
}
415
422
}
416
423
417
- func TestMetadataQueriesWithBlocksStorage (t * testing.T ) {
418
- const blockRangePeriod = 5 * time .Second
419
-
420
- s , err := e2e .NewScenario (networkName )
421
- require .NoError (t , err )
422
- defer s .Close ()
423
-
424
- // Start dependencies.
425
- consul := e2edb .NewConsul ()
426
- minio := e2edb .NewMinio (9000 , bucketName )
427
- memcached := e2ecache .NewMemcached ()
428
- require .NoError (t , s .StartAndWaitReady (consul , minio , memcached ))
429
-
430
- // Setting the replication factor equal to the number of Cortex replicas
431
- // make sure each replica creates the same blocks, so the total number of
432
- // blocks is stable and easy to assert on.
433
- const seriesReplicationFactor = 2
434
-
435
- // Configure the blocks storage to frequently compact TSDB head
436
- // and ship blocks to the storage.
437
- flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
438
- "-blocks-storage.tsdb.block-ranges-period" : blockRangePeriod .String (),
439
- "-blocks-storage.tsdb.ship-interval" : "1s" ,
440
- "-blocks-storage.bucket-store.sync-interval" : "1s" ,
441
- "-blocks-storage.tsdb.retention-period" : ((blockRangePeriod * 2 ) - 1 ).String (),
442
- "-blocks-storage.bucket-store.index-cache.backend" : tsdb .IndexCacheBackendMemcached ,
443
- "-blocks-storage.bucket-store.index-cache.memcached.addresses" : "dns+" + memcached .NetworkEndpoint (e2ecache .MemcachedPort ),
444
- "-querier.ingester-streaming" : "true" ,
445
- // Ingester.
446
- "-ring.store" : "consul" ,
447
- "-consul.hostname" : consul .NetworkHTTPEndpoint (),
448
- // Distributor.
449
- "-distributor.replication-factor" : strconv .FormatInt (seriesReplicationFactor , 10 ),
450
- // Store-gateway.
451
- "-store-gateway.sharding-enabled" : "true" ,
452
- "-store-gateway.sharding-ring.store" : "consul" ,
453
- "-store-gateway.sharding-ring.consul.hostname" : consul .NetworkHTTPEndpoint (),
454
- "-store-gateway.sharding-ring.replication-factor" : "1" ,
455
- })
456
-
457
- // Start Cortex replicas.
458
- cortex1 := e2ecortex .NewSingleBinary ("cortex-1" , flags , "" )
459
- cortex2 := e2ecortex .NewSingleBinary ("cortex-2" , flags , "" )
460
- cluster := e2ecortex .NewCompositeCortexService (cortex1 , cortex2 )
461
- require .NoError (t , s .StartAndWaitReady (cortex1 , cortex2 ))
462
-
463
- // Wait until Cortex replicas have updated the ring state.
464
- for _ , replica := range cluster .Instances () {
465
- numTokensPerInstance := 512 // Ingesters ring.
466
- numTokensPerInstance += 512 * 2 // Store-gateway ring (read both by the querier and store-gateway).
467
-
468
- require .NoError (t , replica .WaitSumMetrics (e2e .Equals (float64 (numTokensPerInstance * cluster .NumInstances ())), "cortex_ring_tokens_total" ))
469
- }
470
-
471
- c , err := e2ecortex .NewClient (cortex1 .HTTPEndpoint (), cortex2 .HTTPEndpoint (), "" , "" , "user-1" )
472
- require .NoError (t , err )
473
-
474
- // Push some series to Cortex.
475
- // Not using time.Now() as we will be using block boundaries and being aligned helps.
476
- series1Timestamp := time .Unix (int64 (blockRangePeriod .Seconds ()), 0 )
477
- series2Timestamp := series1Timestamp .Add (blockRangePeriod * 2 )
478
- series1 , expectedVector1 := generateSeries ("series_1" , series1Timestamp , prompb.Label {Name : "series_1" , Value : "series_1" })
479
- series2 , expectedVector2 := generateSeries ("series_2" , series2Timestamp , prompb.Label {Name : "series_2" , Value : "series_2" })
480
-
481
- res , err := c .Push (series1 )
482
- require .NoError (t , err )
483
- require .Equal (t , 200 , res .StatusCode )
484
-
485
- res , err = c .Push (series2 )
486
- require .NoError (t , err )
487
- require .Equal (t , 200 , res .StatusCode )
488
-
489
- // Wait until the TSDB head is compacted and shipped to the storage.
490
- // The shipped block contains the 1st series, while the 2ns series in in the head.
491
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (1 * cluster .NumInstances ())), "cortex_ingester_shipper_uploads_total" ))
492
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (1 * cluster .NumInstances ())), "cortex_ingester_memory_series" ))
493
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (2 * cluster .NumInstances ())), "cortex_ingester_memory_series_created_total" ))
494
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (1 * cluster .NumInstances ())), "cortex_ingester_memory_series_removed_total" ))
495
-
496
- // Push another series to further compact another block and delete the first block
497
- // due to expired retention.
498
- series3Timestamp := series2Timestamp .Add (blockRangePeriod * 2 )
499
- series3 , expectedVector3 := generateSeries ("series_3" , series3Timestamp , prompb.Label {Name : "series_3" , Value : "series_3" })
500
-
501
- res , err = c .Push (series3 )
502
- require .NoError (t , err )
503
- require .Equal (t , 200 , res .StatusCode )
504
-
505
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (2 * cluster .NumInstances ())), "cortex_ingester_shipper_uploads_total" ))
506
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (1 * cluster .NumInstances ())), "cortex_ingester_memory_series" ))
507
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (3 * cluster .NumInstances ())), "cortex_ingester_memory_series_created_total" ))
508
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (2 * cluster .NumInstances ())), "cortex_ingester_memory_series_removed_total" ))
509
-
510
- // Wait until the querier has discovered the uploaded blocks (discovered both by the querier and store-gateway).
511
- require .NoError (t , cluster .WaitSumMetricsWithOptions (e2e .Equals (float64 (2 * cluster .NumInstances ()* 2 )), []string {"cortex_blocks_meta_synced" }, e2e .WithLabelMatchers (
512
- labels .MustNewMatcher (labels .MatchEqual , "component" , "querier" ))))
513
-
514
- // Wait until the store-gateway has synched the new uploaded blocks.
515
- const shippedBlocks = 2
516
-
517
- require .NoError (t , cluster .WaitSumMetrics (e2e .Equals (float64 (shippedBlocks * seriesReplicationFactor )), "cortex_bucket_store_blocks_loaded" ))
518
-
424
+ func testMetadataQueriesWithBlocksStorage (t * testing.T , c * e2ecortex.Client , series1 , series2 , series3 []prompb.TimeSeries , blockRangePeriod time.Duration ) {
519
425
// series1 is only in storage
520
426
// series2 is in ingester but not head
521
427
// series3 is in head only.
428
+ series1Timestamp := util .TimeFromMillis (series1 [0 ].Samples [0 ].Timestamp )
429
+ series2Timestamp := util .TimeFromMillis (series2 [0 ].Samples [0 ].Timestamp )
430
+ series3Timestamp := util .TimeFromMillis (series3 [0 ].Samples [0 ].Timestamp )
431
+
522
432
type seriesTest struct {
523
433
lookup string
524
434
ok bool
@@ -529,8 +439,7 @@ func TestMetadataQueriesWithBlocksStorage(t *testing.T) {
529
439
resp []string
530
440
}
531
441
532
- testCases := []struct {
533
- name string
442
+ testCases := map [string ]struct {
534
443
from time.Time
535
444
to time.Time
536
445
@@ -540,9 +449,7 @@ func TestMetadataQueriesWithBlocksStorage(t *testing.T) {
540
449
541
450
labelNames []string
542
451
}{
543
- // Query entirely inside the head range.
544
- {
545
- name : "insideHead" ,
452
+ "query metadata entirely inside the head range" : {
546
453
from : series3Timestamp .Add (- blockRangePeriod ),
547
454
to : series3Timestamp ,
548
455
@@ -567,9 +474,7 @@ func TestMetadataQueriesWithBlocksStorage(t *testing.T) {
567
474
568
475
labelNames : []string {labels .MetricName , "series_3" },
569
476
},
570
- // Query entirely inside the ingester range but outside the head range.
571
- {
572
- name : "insideIngesterOutsideHead" ,
477
+ "query metadata entirely inside the ingester range but outside the head range" : {
573
478
from : series2Timestamp ,
574
479
to : series2Timestamp .Add (blockRangePeriod / 2 ),
575
480
@@ -594,9 +499,7 @@ func TestMetadataQueriesWithBlocksStorage(t *testing.T) {
594
499
595
500
labelNames : []string {labels .MetricName , "series_2" },
596
501
},
597
- // Query partially inside the ingester range. Should return the head + local disk data.
598
- {
599
- name : "partiallyInsideIngester" ,
502
+ "query metadata partially inside the ingester range. Should return the head + local disk data" : {
600
503
from : series1Timestamp .Add (- blockRangePeriod ),
601
504
to : series2Timestamp .Add (blockRangePeriod / 2 ),
602
505
@@ -622,10 +525,7 @@ func TestMetadataQueriesWithBlocksStorage(t *testing.T) {
622
525
623
526
labelNames : []string {labels .MetricName , "series_2" , "series_3" },
624
527
},
625
-
626
- // Query entirely outside the ingester range. Should return the head data.
627
- {
628
- name : "outsideIngester" ,
528
+ "query metadata entirely outside the ingester range should return the head data only" : {
629
529
from : series1Timestamp .Add (- blockRangePeriod ),
630
530
to : series1Timestamp ,
631
531
@@ -652,8 +552,8 @@ func TestMetadataQueriesWithBlocksStorage(t *testing.T) {
652
552
},
653
553
}
654
554
655
- for _ , tc := range testCases {
656
- t .Run (tc . name , func (t * testing.T ) {
555
+ for name , tc := range testCases {
556
+ t .Run (name , func (t * testing.T ) {
657
557
for _ , st := range tc .seriesTests {
658
558
seriesRes , err := c .Series ([]string {st .lookup }, tc .from , tc .to )
659
559
require .NoError (t , err )
@@ -672,31 +572,14 @@ func TestMetadataQueriesWithBlocksStorage(t *testing.T) {
672
572
for _ , val := range lvt .resp {
673
573
exp = append (exp , model .LabelValue (val ))
674
574
}
675
- require .Equal (t , exp , labelsRes )
575
+ require .ElementsMatch (t , exp , labelsRes )
676
576
}
677
577
678
578
labelNames , err := c .LabelNames (tc .from , tc .to )
679
579
require .NoError (t , err )
680
580
require .Equal (t , tc .labelNames , labelNames )
681
581
})
682
582
}
683
-
684
- // Also just check normal queries, cuz why not :)
685
-
686
- result , err := c .Query ("series_1" , series1Timestamp )
687
- require .NoError (t , err )
688
- require .Equal (t , model .ValVector , result .Type ())
689
- assert .Equal (t , expectedVector1 , result .(model.Vector ))
690
-
691
- result , err = c .Query ("series_2" , series2Timestamp )
692
- require .NoError (t , err )
693
- require .Equal (t , model .ValVector , result .Type ())
694
- assert .Equal (t , expectedVector2 , result .(model.Vector ))
695
-
696
- result , err = c .Query ("series_3" , series3Timestamp )
697
- require .NoError (t , err )
698
- require .Equal (t , model .ValVector , result .Type ())
699
- assert .Equal (t , expectedVector3 , result .(model.Vector ))
700
583
}
701
584
702
585
func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage (t * testing.T ) {
0 commit comments