From 8a5d73bf6daef7da95cf8df657332f296dea9a62 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Mon, 10 Mar 2025 11:46:15 -0400 Subject: [PATCH 1/3] fix: tester fails when messages cannot be written --- cmd/topicctl/subcmd/tester.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/cmd/topicctl/subcmd/tester.go b/cmd/topicctl/subcmd/tester.go index 03bd18eb..0b959e82 100644 --- a/cmd/topicctl/subcmd/tester.go +++ b/cmd/topicctl/subcmd/tester.go @@ -160,13 +160,13 @@ func runTestWriter(ctx context.Context) error { writer := kafka.NewWriter( kafka.WriterConfig{ - Brokers: []string{connector.Config.BrokerAddr}, - Dialer: connector.Dialer, - Topic: testerConfig.topic, - Balancer: &kafka.LeastBytes{}, - Async: true, - QueueCapacity: 5, - BatchSize: 5, + Brokers: []string{connector.Config.BrokerAddr}, + Dialer: connector.Dialer, + Topic: testerConfig.topic, + Balancer: &kafka.LeastBytes{}, + Async: false, + BatchSize: 5, + BatchTimeout: 1 * time.Millisecond, }, ) defer writer.Close() @@ -183,17 +183,22 @@ func runTestWriter(ctx context.Context) error { case <-ctx.Done(): return nil case <-sendTicker.C: - err := writer.WriteMessages( - ctx, - kafka.Message{ + msgs := []kafka.Message{} + + for i := 0; i < 5; i++ { + msgs = append(msgs, kafka.Message{ Key: []byte(fmt.Sprintf("msg_%d", index)), Value: []byte(fmt.Sprintf("Contents of test message %d", index)), - }, + }) + index++ + } + err := writer.WriteMessages( + ctx, + msgs..., ) if err != nil { return err } - index++ case <-logTicker.C: log.Infof("%d messages sent", index) } From ede14c8272ba4478eb508c0cd4297c63f248dd2b Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Mon, 10 Mar 2025 13:18:59 -0400 Subject: [PATCH 2/3] fix writeRate --- cmd/topicctl/subcmd/tester.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/topicctl/subcmd/tester.go b/cmd/topicctl/subcmd/tester.go index 0b959e82..f933daea 100644 --- a/cmd/topicctl/subcmd/tester.go +++ b/cmd/topicctl/subcmd/tester.go @@ -158,6 +158,8 @@ func runTestWriter(ctx context.Context) error { return errors.New("Stopping because of user response") } + batchSize := 5 + writer := kafka.NewWriter( kafka.WriterConfig{ Brokers: []string{connector.Config.BrokerAddr}, @@ -165,14 +167,14 @@ func runTestWriter(ctx context.Context) error { Topic: testerConfig.topic, Balancer: &kafka.LeastBytes{}, Async: false, - BatchSize: 5, + BatchSize: batchSize, BatchTimeout: 1 * time.Millisecond, }, ) defer writer.Close() index := 0 - tickDuration := time.Duration(1000.0/float64(testerConfig.writeRate)) * time.Millisecond + tickDuration := time.Duration(1000.0/float64(testerConfig.writeRate/batchSize)) * time.Millisecond sendTicker := time.NewTicker(tickDuration) logTicker := time.NewTicker(5 * time.Second) From 4f6568dec1976795cbd78d75517724b9cb78de49 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Mon, 10 Mar 2025 13:21:43 -0400 Subject: [PATCH 3/3] lower batchTimeout --- cmd/topicctl/subcmd/tester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/tester.go b/cmd/topicctl/subcmd/tester.go index f933daea..e6b97883 100644 --- a/cmd/topicctl/subcmd/tester.go +++ b/cmd/topicctl/subcmd/tester.go @@ -168,7 +168,7 @@ func runTestWriter(ctx context.Context) error { Balancer: &kafka.LeastBytes{}, Async: false, BatchSize: batchSize, - BatchTimeout: 1 * time.Millisecond, + BatchTimeout: 1 * time.Nanosecond, }, ) defer writer.Close()