Skip to content

Commit 6e16c38

Browse files
authored
kafka: fix possibility to loose records under load (#917)
1 parent 0bde1f4 commit 6e16c38

File tree

2 files changed

+234
-31
lines changed

2 files changed

+234
-31
lines changed

internal/consuming/kafka.go

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type KafkaConfig struct {
4242
// will pause fetching records from Kafka. By default, this is 16.
4343
// Set to -1 to use non-buffered channel.
4444
PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size"`
45+
46+
// FetchMaxBytes is the maximum number of bytes to fetch from Kafka in a single request.
47+
// If not set the default 50MB is used.
48+
FetchMaxBytes int32 `mapstructure:"fetch_max_bytes" json:"fetch_max_bytes"`
4549
}
4650

4751
type topicPartition struct {
@@ -142,6 +146,9 @@ func (c *KafkaConsumer) initClient() (*kgo.Client, error) {
142146
kgo.ClientID(kafkaClientID),
143147
kgo.InstanceID(c.getInstanceID()),
144148
}
149+
if c.config.FetchMaxBytes > 0 {
150+
opts = append(opts, kgo.FetchMaxBytes(c.config.FetchMaxBytes))
151+
}
145152
if c.config.TLS {
146153
tlsOptionsMap, err := c.config.TLSOptions.ToMap()
147154
if err != nil {
@@ -284,12 +291,19 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error {
284291
return fmt.Errorf("poll error: %w", errors.Join(errs...))
285292
}
286293

294+
pausedTopicPartitions := map[topicPartition]struct{}{}
287295
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
288296
if len(p.Records) == 0 {
289297
return
290298
}
291299

292300
tp := topicPartition{p.Topic, p.Partition}
301+
if _, paused := pausedTopicPartitions[tp]; paused {
302+
// We have already paused this partition during this poll, so we should not
303+
// process records from it anymore. We will resume partition processing with the
304+
// correct offset soon, after we have space in recs buffer.
305+
return
306+
}
293307

294308
// Since we are using BlockRebalanceOnPoll, we can be
295309
// sure this partition consumer exists:
@@ -310,6 +324,12 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error {
310324
// keeping records in memory and blocking rebalance. Resume will be called after
311325
// records are processed by c.consumers[tp].
312326
c.client.PauseFetchPartitions(partitionsToPause)
327+
pausedTopicPartitions[tp] = struct{}{}
328+
// To poll next time since correct offset we need to set it manually to the offset of
329+
// the first record in the batch. Otherwise, next poll will return the next record batch,
330+
// and we will lose the current one.
331+
epochOffset := kgo.EpochOffset{Epoch: -1, Offset: p.Records[0].Offset}
332+
c.client.SetOffsets(map[string]map[int32]kgo.EpochOffset{p.Topic: {p.Partition: epochOffset}})
313333
}
314334
})
315335
c.client.AllowRebalance()
@@ -355,15 +375,25 @@ func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned m
355375
}
356376
for topic, partitions := range assigned {
357377
for _, partition := range partitions {
378+
quitCh := make(chan struct{})
379+
partitionCtx, cancel := context.WithCancel(ctx)
380+
go func() {
381+
select {
382+
case <-ctx.Done():
383+
cancel()
384+
case <-quitCh:
385+
cancel()
386+
}
387+
}()
358388
pc := &partitionConsumer{
359-
clientCtx: ctx,
360-
dispatcher: c.dispatcher,
361-
logger: c.logger,
362-
cl: cl,
363-
topic: topic,
364-
partition: partition,
365-
366-
quit: make(chan struct{}),
389+
partitionCtx: partitionCtx,
390+
dispatcher: c.dispatcher,
391+
logger: c.logger,
392+
cl: cl,
393+
topic: topic,
394+
partition: partition,
395+
396+
quit: quitCh,
367397
done: make(chan struct{}),
368398
recs: make(chan kgo.FetchTopicPartition, bufferSize),
369399
}
@@ -407,12 +437,12 @@ func (c *KafkaConsumer) killConsumers(lost map[string][]int32) {
407437
}
408438

409439
type partitionConsumer struct {
410-
clientCtx context.Context
411-
dispatcher Dispatcher
412-
logger Logger
413-
cl *kgo.Client
414-
topic string
415-
partition int32
440+
partitionCtx context.Context
441+
dispatcher Dispatcher
442+
logger Logger
443+
cl *kgo.Client
444+
topic string
445+
partition int32
416446

417447
quit chan struct{}
418448
done chan struct{}
@@ -422,40 +452,36 @@ type partitionConsumer struct {
422452
func (pc *partitionConsumer) processRecords(records []*kgo.Record) {
423453
for _, record := range records {
424454
select {
425-
case <-pc.clientCtx.Done():
426-
return
427-
case <-pc.quit:
455+
case <-pc.partitionCtx.Done():
428456
return
429457
default:
430458
}
431459

432460
var e KafkaJSONEvent
433461
err := json.Unmarshal(record.Value, &e)
434462
if err != nil {
435-
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling event from Kafka", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition}))
463+
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error unmarshalling record value from Kafka", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition}))
436464
pc.cl.MarkCommitRecords(record)
437465
continue
438466
}
439467

440468
var backoffDuration time.Duration = 0
441469
retries := 0
442470
for {
443-
err := pc.dispatcher.Dispatch(pc.clientCtx, e.Method, e.Payload)
471+
err := pc.dispatcher.Dispatch(pc.partitionCtx, e.Method, e.Payload)
444472
if err == nil {
445473
if retries > 0 {
446-
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing events after errors", map[string]any{}))
474+
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "OK processing record after errors", map[string]any{}))
447475
}
448476
pc.cl.MarkCommitRecords(record)
449477
break
450478
}
451479
retries++
452480
backoffDuration = getNextBackoffDuration(backoffDuration, retries)
453-
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed event", map[string]any{"error": err.Error(), "method": e.Method, "nextAttemptIn": backoffDuration.String()}))
481+
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error processing consumed record", map[string]any{"error": err.Error(), "method": e.Method, "next_attempt_in": backoffDuration.String()}))
454482
select {
455483
case <-time.After(backoffDuration):
456-
case <-pc.quit:
457-
return
458-
case <-pc.clientCtx.Done():
484+
case <-pc.partitionCtx.Done():
459485
return
460486
}
461487
}
@@ -471,9 +497,7 @@ func (pc *partitionConsumer) consume() {
471497
defer resumeConsuming()
472498
for {
473499
select {
474-
case <-pc.clientCtx.Done():
475-
return
476-
case <-pc.quit:
500+
case <-pc.partitionCtx.Done():
477501
return
478502
case p := <-pc.recs:
479503
pc.processRecords(p.Records)

internal/consuming/kafka_test.go

Lines changed: 183 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"encoding/json"
88
"errors"
99
"fmt"
10+
"strconv"
1011
"strings"
12+
"sync/atomic"
1113
"testing"
1214
"time"
1315

@@ -44,15 +46,26 @@ func (m *MockLogger) Log(_ centrifuge.LogEntry) {
4446
// Implement mock logic, e.g., storing log entries for assertions
4547
}
4648

49+
func produceManyRecords(records ...*kgo.Record) error {
50+
client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL))
51+
if err != nil {
52+
return fmt.Errorf("failed to create Kafka client: %w", err)
53+
}
54+
defer client.Close()
55+
err = client.ProduceSync(context.Background(), records...).FirstErr()
56+
if err != nil {
57+
return fmt.Errorf("failed to produce message: %w", err)
58+
}
59+
return nil
60+
}
61+
4762
func produceTestMessage(topic string, message []byte) error {
48-
// Create a new client
4963
client, err := kgo.NewClient(kgo.SeedBrokers(testKafkaBrokerURL))
5064
if err != nil {
5165
return fmt.Errorf("failed to create Kafka client: %w", err)
5266
}
5367
defer client.Close()
5468

55-
// Produce a message
5669
err = client.ProduceSync(context.Background(), &kgo.Record{Topic: topic, Partition: 0, Value: message}).FirstErr()
5770
if err != nil {
5871
return fmt.Errorf("failed to produce message: %w", err)
@@ -61,7 +74,6 @@ func produceTestMessage(topic string, message []byte) error {
6174
}
6275

6376
func produceTestMessageToPartition(topic string, message []byte, partition int32) error {
64-
// Create a new client.
6577
client, err := kgo.NewClient(
6678
kgo.SeedBrokers(testKafkaBrokerURL),
6779
kgo.RecordPartitioner(kgo.ManualPartitioner()),
@@ -71,7 +83,6 @@ func produceTestMessageToPartition(topic string, message []byte, partition int32
7183
}
7284
defer client.Close()
7385

74-
// Produce a message until we hit desired partition.
7586
res := client.ProduceSync(context.Background(), &kgo.Record{
7687
Topic: topic, Partition: partition, Value: message})
7788
if res.FirstErr() != nil {
@@ -427,3 +438,171 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T
427438
})
428439
}
429440
}
441+
442+
func TestKafkaConsumer_PausePartitions(t *testing.T) {
443+
t.Parallel()
444+
testKafkaTopic := "consumer_test_" + uuid.New().String()
445+
testPayload1 := []byte(`{"key":"value1"}`)
446+
testPayload2 := []byte(`{"key":"value2"}`)
447+
testPayload3 := []byte(`{"key":"value3"}`)
448+
449+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
450+
defer cancel()
451+
452+
err := createTestTopic(ctx, testKafkaTopic, 1, 1)
453+
require.NoError(t, err)
454+
455+
event1Received := make(chan struct{})
456+
event2Received := make(chan struct{})
457+
event3Received := make(chan struct{})
458+
consumerClosed := make(chan struct{})
459+
doneCh := make(chan struct{})
460+
461+
config := KafkaConfig{
462+
Brokers: []string{testKafkaBrokerURL},
463+
Topics: []string{testKafkaTopic},
464+
ConsumerGroup: uuid.New().String(),
465+
PartitionBufferSize: -1,
466+
}
467+
468+
numCalls := 0
469+
470+
mockDispatcher := &MockDispatcher{
471+
onDispatch: func(ctx context.Context, method string, data []byte) error {
472+
numCalls++
473+
if numCalls == 1 {
474+
close(event1Received)
475+
time.Sleep(5 * time.Second)
476+
return nil
477+
} else if numCalls == 2 {
478+
close(event2Received)
479+
return nil
480+
}
481+
close(event3Received)
482+
return nil
483+
},
484+
}
485+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config)
486+
require.NoError(t, err)
487+
488+
go func() {
489+
err = produceTestMessage(testKafkaTopic, testPayload1)
490+
require.NoError(t, err)
491+
<-event1Received
492+
// At this point message 1 is being processed and the next produced message will
493+
// cause a partition pause.
494+
err = produceTestMessage(testKafkaTopic, testPayload2)
495+
require.NoError(t, err)
496+
<-event2Received
497+
err = produceTestMessage(testKafkaTopic, testPayload3)
498+
require.NoError(t, err)
499+
}()
500+
501+
go func() {
502+
err := consumer.Run(ctx)
503+
require.ErrorIs(t, err, context.Canceled)
504+
close(consumerClosed)
505+
}()
506+
507+
waitCh(t, event1Received, 30*time.Second, "timeout waiting for event 1")
508+
waitCh(t, event2Received, 30*time.Second, "timeout waiting for event 2")
509+
waitCh(t, event3Received, 30*time.Second, "timeout waiting for event 3")
510+
cancel()
511+
waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed")
512+
close(doneCh)
513+
}
514+
515+
func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) {
516+
t.Skip()
517+
t.Parallel()
518+
519+
testCases := []struct {
520+
numPartitions int32
521+
numMessages int
522+
partitionBuffer int
523+
}{
524+
//{numPartitions: 1, numMessages: 1000, partitionBuffer: -1},
525+
//{numPartitions: 1, numMessages: 1000, partitionBuffer: 1},
526+
//{numPartitions: 10, numMessages: 10000, partitionBuffer: -1},
527+
{numPartitions: 10, numMessages: 10000, partitionBuffer: 1},
528+
}
529+
530+
for _, tc := range testCases {
531+
name := fmt.Sprintf("partitions=%d,messages=%d,buffer=%d", tc.numPartitions, tc.numMessages, tc.partitionBuffer)
532+
t.Run(name, func(t *testing.T) {
533+
testKafkaTopic := "consumer_test_" + uuid.New().String()
534+
535+
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
536+
defer cancel()
537+
538+
err := createTestTopic(ctx, testKafkaTopic, tc.numPartitions, 1)
539+
require.NoError(t, err)
540+
541+
consumerClosed := make(chan struct{})
542+
doneCh := make(chan struct{})
543+
544+
numMessages := tc.numMessages
545+
messageCh := make(chan struct{}, numMessages)
546+
547+
mockDispatcher := &MockDispatcher{
548+
onDispatch: func(ctx context.Context, method string, data []byte) error {
549+
// Emulate delay due to some work.
550+
time.Sleep(20 * time.Millisecond)
551+
messageCh <- struct{}{}
552+
return nil
553+
},
554+
}
555+
config := KafkaConfig{
556+
Brokers: []string{testKafkaBrokerURL},
557+
Topics: []string{testKafkaTopic},
558+
ConsumerGroup: uuid.New().String(),
559+
PartitionBufferSize: tc.partitionBuffer,
560+
}
561+
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config)
562+
require.NoError(t, err)
563+
564+
var records []*kgo.Record
565+
for i := 0; i < numMessages; i++ {
566+
records = append(records, &kgo.Record{Topic: testKafkaTopic, Value: []byte(`{"hello": "` + strconv.Itoa(i) + `"}`)})
567+
if (i+1)%100 == 0 {
568+
err = produceManyRecords(records...)
569+
if err != nil {
570+
t.Fatal(err)
571+
}
572+
records = nil
573+
t.Logf("produced %d messages", i+1)
574+
}
575+
}
576+
577+
t.Logf("all messages produced, 3, 2, 1, go!")
578+
time.Sleep(time.Second)
579+
580+
go func() {
581+
err := consumer.Run(ctx)
582+
require.ErrorIs(t, err, context.Canceled)
583+
close(consumerClosed)
584+
}()
585+
586+
var numProcessed int64
587+
go func() {
588+
for {
589+
select {
590+
case <-ctx.Done():
591+
return
592+
case <-time.After(time.Second):
593+
t.Logf("processed %d messages", atomic.LoadInt64(&numProcessed))
594+
}
595+
}
596+
}()
597+
598+
for i := 0; i < numMessages; i++ {
599+
<-messageCh
600+
atomic.AddInt64(&numProcessed, 1)
601+
}
602+
t.Logf("all messages processed")
603+
cancel()
604+
waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed")
605+
close(doneCh)
606+
})
607+
}
608+
}

0 commit comments

Comments
 (0)