@@ -140,7 +140,7 @@ type IngestLimits struct {
140
140
metadata map [string ]map [int32 ][]streamMetadata // tenant -> partitionID -> streamMetadata
141
141
142
142
// Track partition assignments
143
- assignedPartitions map [ int32 ] int64 // partitionID -> lastAssignedAt
143
+ partitionManager * PartitionManager
144
144
}
145
145
146
146
// Flush implements ring.FlushTransferer. It transfers state to another ingest limits instance.
@@ -156,10 +156,11 @@ func (s *IngestLimits) TransferOut(_ context.Context) error {
156
156
func NewIngestLimits (cfg Config , logger log.Logger , reg prometheus.Registerer ) (* IngestLimits , error ) {
157
157
var err error
158
158
s := & IngestLimits {
159
- cfg : cfg ,
160
- logger : logger ,
161
- metadata : make (map [string ]map [int32 ][]streamMetadata ),
162
- metrics : newMetrics (reg ),
159
+ cfg : cfg ,
160
+ logger : logger ,
161
+ metadata : make (map [string ]map [int32 ][]streamMetadata ),
162
+ metrics : newMetrics (reg ),
163
+ partitionManager : NewPartitionManager (logger ),
163
164
}
164
165
165
166
// Initialize internal metadata metrics
@@ -221,7 +222,7 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
221
222
)
222
223
223
224
for partitionID , partition := range partitions {
224
- if _ , assigned := s . assignedPartitions [ partitionID ]; ! assigned {
225
+ if ! s . partitionManager . Has ( partitionID ) {
225
226
continue
226
227
}
227
228
@@ -240,57 +241,40 @@ func (s *IngestLimits) Collect(m chan<- prometheus.Metric) {
240
241
}
241
242
}
242
243
243
- func (s * IngestLimits ) onPartitionsAssigned (_ context.Context , _ * kgo.Client , partitions map [string ][]int32 ) {
244
+ func (s * IngestLimits ) onPartitionsAssigned (ctx context.Context , client * kgo.Client , partitions map [string ][]int32 ) {
245
+ s .partitionManager .Assign (ctx , client , partitions )
246
+ }
247
+
248
+ func (s * IngestLimits ) onPartitionsRevoked (ctx context.Context , client * kgo.Client , partitions map [string ][]int32 ) {
244
249
s .mtx .Lock ()
245
250
defer s .mtx .Unlock ()
246
-
247
- if s .assignedPartitions == nil {
248
- s .assignedPartitions = make (map [int32 ]int64 )
249
- }
250
-
251
- var assigned []string
251
+ s .partitionManager .Remove (ctx , client , partitions )
252
+ // TODO(grobinson): Use callbacks from partition manager to delete
253
+ // metadata.
252
254
for _ , partitionIDs := range partitions {
253
255
for _ , partitionID := range partitionIDs {
254
- s .assignedPartitions [partitionID ] = time .Now ().UnixNano ()
255
- assigned = append (assigned , strconv .Itoa (int (partitionID )))
256
+ // Delete partition from tenant metadata.
257
+ for _ , tp := range s .metadata {
258
+ delete (tp , partitionID )
259
+ }
256
260
}
257
261
}
258
-
259
- if len (assigned ) > 0 {
260
- level .Debug (s .logger ).Log ("msg" , "assigned partitions" , "partitions" , strings .Join (assigned , "," ))
261
- }
262
- }
263
-
264
- func (s * IngestLimits ) onPartitionsRevoked (_ context.Context , _ * kgo.Client , partitions map [string ][]int32 ) {
265
- s .removePartitions (partitions )
266
262
}
267
263
268
- func (s * IngestLimits ) onPartitionsLost (_ context.Context , _ * kgo.Client , partitions map [string ][]int32 ) {
269
- s .removePartitions (partitions )
270
- }
271
-
272
- func (s * IngestLimits ) removePartitions (partitions map [string ][]int32 ) {
264
+ func (s * IngestLimits ) onPartitionsLost (ctx context.Context , client * kgo.Client , partitions map [string ][]int32 ) {
273
265
s .mtx .Lock ()
274
266
defer s .mtx .Unlock ()
275
-
276
- var dropped int
277
-
267
+ s . partitionManager . Remove ( ctx , client , partitions )
268
+ // TODO(grobinson): Use callbacks from partition manager to delete
269
+ // metadata.
278
270
for _ , partitionIDs := range partitions {
279
- dropped += len (partitionIDs )
280
271
for _ , partitionID := range partitionIDs {
281
- // Unassign the partition from the ingest limits instance
282
- delete (s .assignedPartitions , partitionID )
283
-
284
- // Remove the partition from the metadata map
285
- for _ , partitions := range s .metadata {
286
- delete (partitions , partitionID )
272
+ // Delete partition from tenant metadata.
273
+ for _ , tp := range s .metadata {
274
+ delete (tp , partitionID )
287
275
}
288
276
}
289
277
}
290
-
291
- if dropped > 0 {
292
- level .Debug (s .logger ).Log ("msg" , "removed partitions" , "partitions" , dropped )
293
- }
294
278
}
295
279
296
280
func (s * IngestLimits ) CheckReady (ctx context.Context ) error {
@@ -449,7 +433,7 @@ func (s *IngestLimits) updateMetadata(rec *logproto.StreamMetadata, tenant strin
449
433
}
450
434
451
435
// Partition not assigned to this instance, evict stream
452
- if _ , assigned := s .assignedPartitions [ partition ] ; ! assigned {
436
+ if assigned := s .partitionManager . Has ( partition ) ; ! assigned {
453
437
for i , stream := range s.metadata [tenant ][partition ] {
454
438
if stream .hash == rec .StreamHash {
455
439
s.metadata [tenant ][partition ] = append (s.metadata [tenant ][partition ][:i ], s.metadata [tenant ][partition ][i + 1 :]... )
@@ -581,14 +565,10 @@ func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request) {
581
565
func (s * IngestLimits ) GetAssignedPartitions (_ context.Context , _ * logproto.GetAssignedPartitionsRequest ) (* logproto.GetAssignedPartitionsResponse , error ) {
582
566
s .mtx .RLock ()
583
567
defer s .mtx .RUnlock ()
584
-
585
- // Make a copy of the assigned partitions map to avoid potential concurrent access issues
586
- partitions := make (map [int32 ]int64 , len (s .assignedPartitions ))
587
- for k , v := range s .assignedPartitions {
588
- partitions [k ] = v
568
+ resp := logproto.GetAssignedPartitionsResponse {
569
+ AssignedPartitions : s .partitionManager .List (),
589
570
}
590
-
591
- return & logproto.GetAssignedPartitionsResponse {AssignedPartitions : partitions }, nil
571
+ return & resp , nil
592
572
}
593
573
594
574
// GetStreamUsage implements the logproto.IngestLimitsServer interface.
0 commit comments