Skip to content

Commit 2ba30cc

Browse files
authored
KAFKA-19574: Improve producer and consumer config files (#20302)
This is an attempt at improving the client configuration files. We now have sections and comments similar to the other properties files. Reviewers: Kirk True <[email protected]>, Luke Chen <[email protected]> --------- Signed-off-by: Federico Valeri <[email protected]>
1 parent 342a8e6 commit 2ba30cc

File tree

4 files changed

+285
-30
lines changed

4 files changed

+285
-30
lines changed

clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.junit.jupiter.params.ParameterizedTest;
3131
import org.junit.jupiter.params.provider.CsvSource;
3232

33+
import java.io.FileInputStream;
34+
import java.io.InputStream;
3335
import java.util.Arrays;
3436
import java.util.HashMap;
3537
import java.util.Locale;
@@ -41,6 +43,7 @@
4143
import static org.junit.jupiter.api.Assertions.assertNull;
4244
import static org.junit.jupiter.api.Assertions.assertThrows;
4345
import static org.junit.jupiter.api.Assertions.assertTrue;
46+
import static org.junit.jupiter.api.Assertions.fail;
4447

4548
public class ConsumerConfigTest {
4649

@@ -256,4 +259,26 @@ private void testUnsupportedConfigsWithConsumerGroupProtocol(String configName,
256259
assertEquals(configName + " cannot be set when " +
257260
ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name(), exception.getMessage());
258261
}
262+
263+
/**
264+
* Validates config/consumer.properties file to avoid getting out of sync with ConsumerConfig.
265+
*/
266+
@Test
267+
public void testValidateConfigPropertiesFile() {
268+
Properties props = new Properties();
269+
270+
try (InputStream inputStream = new FileInputStream(System.getProperty("user.dir") + "/../config/consumer.properties")) {
271+
props.load(inputStream);
272+
} catch (Exception e) {
273+
fail("Failed to load config/consumer.properties file: " + e.getMessage());
274+
}
275+
276+
ConsumerConfig config = new ConsumerConfig(props);
277+
278+
for (String key : config.originals().keySet()) {
279+
if (!ConsumerConfig.configDef().configKeys().containsKey(key)) {
280+
fail("Invalid configuration key: " + key);
281+
}
282+
}
283+
}
259284
}

clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@
2626

2727
import org.junit.jupiter.api.Test;
2828

29+
import java.io.FileInputStream;
30+
import java.io.InputStream;
2931
import java.util.HashMap;
3032
import java.util.Locale;
3133
import java.util.Map;
34+
import java.util.Properties;
3235

3336
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3437
import static org.junit.jupiter.api.Assertions.assertEquals;
3538
import static org.junit.jupiter.api.Assertions.assertThrows;
3639
import static org.junit.jupiter.api.Assertions.assertTrue;
40+
import static org.junit.jupiter.api.Assertions.fail;
3741

3842
public class ProducerConfigTest {
3943

@@ -168,4 +172,26 @@ void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
168172
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, false);
169173
assertDoesNotThrow(() -> new ProducerConfig(configs));
170174
}
175+
176+
/**
177+
* Validates config/producer.properties file to avoid getting out of sync with ProducerConfig.
178+
*/
179+
@Test
180+
public void testValidateConfigPropertiesFile() {
181+
Properties props = new Properties();
182+
183+
try (InputStream inputStream = new FileInputStream(System.getProperty("user.dir") + "/../config/producer.properties")) {
184+
props.load(inputStream);
185+
} catch (Exception e) {
186+
fail("Failed to load config/producer.properties file: " + e.getMessage());
187+
}
188+
189+
ProducerConfig config = new ProducerConfig(props);
190+
191+
for (String key : config.originals().keySet()) {
192+
if (!ProducerConfig.configDef().configKeys().containsKey(key)) {
193+
fail("Invalid configuration key: " + key);
194+
}
195+
}
196+
}
171197
}

config/consumer.properties

Lines changed: 121 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,135 @@
44
# The ASF licenses this file to You under the Apache License, Version 2.0
55
# (the "License"); you may not use this file except in compliance with
66
# the License. You may obtain a copy of the License at
7-
#
7+
#
88
# http://www.apache.org/licenses/LICENSE-2.0
9-
#
9+
#
1010
# Unless required by applicable law or agreed to in writing, software
1111
# distributed under the License is distributed on an "AS IS" BASIS,
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
1615

17-
# list of brokers used for bootstrapping knowledge about the rest of the cluster
18-
# format: host1:port1,host2:port2 ...
16+
# See org.apache.kafka.clients.consumer.ConsumerConfig for more details.
17+
# Consider using environment variables or external configuration management
18+
# for sensitive information like passwords and environment-specific settings.
19+
20+
##################### Consumer Basics #######################
21+
22+
# List of Kafka brokers used for initial cluster discovery and metadata retrieval.
23+
# Format: host1:port1,host2:port2,host3:port3
24+
# Include all brokers for high availability
1925
bootstrap.servers=localhost:9092
2026

21-
# consumer group id
27+
# Client identifier for logging and metrics.
28+
# Helps with debugging and monitoring.
29+
client.id=test-consumer
30+
31+
##################### Transaction Support #####################
32+
33+
# Isolation level for reading messages.
34+
# Options: read_uncommitted (default), read_committed (for exactly-once semantics).
35+
isolation.level=read_uncommitted
36+
37+
##################### Consumer Group Configuration #####################
38+
39+
# Unique identifier for this consumer group.
40+
# All consumers with the same group.id will share partition consumption.
2241
group.id=test-consumer-group
2342

24-
# What to do when there is no initial offset in Kafka or if the current
25-
# offset does not exist any more on the server: latest, earliest, none
26-
#auto.offset.reset=
43+
# What to do when there is no initial offset or if the current offset no longer exists.
44+
# Options: earliest (from beginning), latest (from end), none (throw exception).
45+
# Use 'earliest' to avoid data loss on first run.
46+
auto.offset.reset=earliest
47+
48+
##################### Partition Assignment Strategy #####################
49+
50+
# Strategy for assigning partitions to consumers in a group.
51+
# Options: RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor.
52+
# CooperativeStickyAssignor is recommended (requires Kafka 2.4+).
53+
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
54+
55+
##################### Deserialization #####################
56+
57+
# Deserializer class for message keys.
58+
# Common options: StringDeserializer, ByteArrayDeserializer, AvroDeserializer.
59+
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
60+
61+
# Deserializer class for message values.
62+
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
63+
64+
##################### Offset Management #####################
65+
66+
# Whether to automatically commit offsets in the background.
67+
# Set to false for manual offset management and exactly-once processing.
68+
enable.auto.commit=true
69+
70+
# Frequency (in milliseconds) at which offsets are auto-committed.
71+
# Lower values provide better fault tolerance but increase broker load.
72+
auto.commit.interval.ms=5000
73+
74+
##################### Classic Group Session Management #####################
75+
76+
# Timeout for detecting consumer failures when using group management.
77+
# Must be between group.min.session.timeout.ms and group.max.session.timeout.ms (broker config).
78+
session.timeout.ms=30000
79+
80+
# Expected time between heartbeats when using group management.
81+
# Should be lower than session.timeout.ms (typically 1/3 of session timeout).
82+
heartbeat.interval.ms=10000
83+
84+
# Maximum time between successive calls to poll().
85+
# If exceeded, consumer is considered failed and partition rebalancing occurs.
86+
max.poll.interval.ms=300000
87+
88+
##################### Retry And Error Handling #####################
89+
90+
# Initial and max time to wait for failed request retries.
91+
# The retry.backoff.ms is the initial backoff value and will increase exponentially
92+
# for each failed request, up to the retry.backoff.max.ms value.
93+
retry.backoff.ms=100
94+
retry.backoff.max.ms=1000
95+
96+
# Total time to wait for a response to a request.
97+
request.timeout.ms=40000
98+
99+
# Close idle connections after this many milliseconds.
100+
connections.max.idle.ms=540000
101+
102+
##################### Security Configuration #####################
103+
104+
# Security protocol for communication with brokers.
105+
# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
106+
#security.protocol=SASL_SSL
107+
108+
# SSL configuration.
109+
#ssl.truststore.location=/path/to/truststore.jks
110+
#ssl.truststore.password=truststore-password
111+
112+
# SASL configuration.
113+
#sasl.mechanism=PLAIN
114+
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
115+
# username="your-username" \
116+
# password="your-password";
117+
118+
##################### Performance And Throughput #####################
119+
120+
# Minimum data size (bytes) and maximum polling timeout (ms).
121+
# Whichever condition is met first will trigger the fetch operation.
122+
# Balances response latency against message batching efficiency.
123+
# For remote partition fetching, configure remote.fetch.max.wait.ms instead.
124+
fetch.min.bytes=1
125+
fetch.max.wait.ms=500
126+
127+
# Set soft limits to the amount of bytes per fetch request and partition.
128+
# Both max.partition.fetch.bytes and fetch.max.bytes limits can be exceeded when
129+
# the first batch in the first non-empty partition is larger than the configured
130+
# value to ensure that the consumer can make progress.
131+
# Configuring message.max.bytes (broker config) or max.message.bytes (topic config)
132+
# <= fetch.max.bytes prevents oversized fetch responses.
133+
fetch.max.bytes=52428800
134+
max.partition.fetch.bytes=1048576
135+
136+
# Maximum number of records returned in a single poll() call.
137+
# Higher values increase throughput but may cause longer processing delays.
138+
max.poll.records=500

config/producer.properties

Lines changed: 113 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,127 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
# see org.apache.kafka.clients.producer.ProducerConfig for more details
1615

17-
############################# Producer Basics #############################
16+
# See org.apache.kafka.clients.producer.ProducerConfig for more details.
17+
# Consider using environment variables or external configuration management
18+
# for sensitive information like passwords and environment-specific settings.
1819

19-
# list of brokers used for bootstrapping knowledge about the rest of the cluster
20-
# format: host1:port1,host2:port2 ...
20+
##################### Producer Basics #####################
21+
22+
# List of Kafka brokers used for initial cluster discovery and metadata retrieval.
23+
# Format: host1:port1,host2:port2,host3:port3
24+
# Include all brokers for high availability.
2125
bootstrap.servers=localhost:9092
2226

23-
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
24-
compression.type=none
27+
# Client identifier for logging and metrics.
28+
# Helps with debugging and monitoring.
29+
client.id=test-producer
30+
31+
##################### Transaction Support #####################
32+
33+
# Transactional ID for the producer.
34+
# Must be unique across all producer instances.
35+
# Enables exactly-once semantics across multiple partitions/topics.
36+
#transactional.id=test-transactional-id
37+
38+
# Maximum amount of time in milliseconds that a transaction will remain open.
39+
# Only applies when transactional.id is set.
40+
transaction.timeout.ms=60000
41+
42+
##################### Partitioning #####################
43+
44+
# Name of the partitioner class for partitioning records.
45+
# Default uses "sticky" partitioning which improves throughput by filling batches
46+
# Options: DefaultPartitioner, RoundRobinPartitioner, UniformStickyPartitioner.
47+
#partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
48+
49+
##################### Serialization #####################
50+
51+
# Serializer class for message keys.
52+
# Common options: StringSerializer, ByteArraySerializer, AvroSerializer.
53+
key.serializer=org.apache.kafka.common.serialization.StringSerializer
54+
55+
# Serializer class for message values.
56+
value.serializer=org.apache.kafka.common.serialization.StringSerializer
57+
58+
##################### Reliability And Durability #####################
59+
60+
# Number of acknowledgments the producer requires the leader to have received.
61+
# Options: 0 (no ack), 1 (leader only), all/-1 (all in-sync replicas).
62+
# Use 'all' for maximum durability.
63+
acks=all
2564

26-
# name of the partitioner class for partitioning records;
27-
# The default uses "sticky" partitioning logic which spreads the load evenly between partitions, but improves throughput by attempting to fill the batches sent to each partition.
28-
#partitioner.class=
65+
# Number of retries for failed sends.
66+
# Set to high value or Integer.MAX_VALUE for maximum reliability.
67+
retries=2147483647
2968

30-
# the maximum amount of time the client will wait for the response of a request
31-
#request.timeout.ms=
69+
# Initial and max time to wait for failed request retries.
70+
# The retry.backoff.ms is the initial backoff value and will increase exponentially
71+
# for each failed request, up to the retry.backoff.max.ms value.
72+
retry.backoff.ms=100
73+
retry.backoff.max.ms=1000
3274

33-
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
34-
#max.block.ms=
75+
# Enable idempotent producer to prevent duplicate messages.
76+
# Ensures exactly-once delivery semantics when combined with proper consumer settings.
77+
enable.idempotence=true
78+
79+
# Maximum number of unacknowledged requests the client will send on a single connection.
80+
# Must be <= 5 when enable.idempotence=true to maintain ordering guarantees.
81+
max.in.flight.requests.per.connection=5
82+
83+
##################### Timeouts And Blocking #####################
84+
85+
# Maximum amount of time the client will wait for the response of a request.
86+
# Should be higher than replica.lag.time.max.ms (broker config).
87+
request.timeout.ms=30000
88+
89+
# How long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.
90+
# Should be higher than request.timeout.ms.
91+
max.block.ms=60000
92+
93+
# Timeout for broker requests, including produce requests.
94+
# Should be greater than or equal to the sum of request.timeout.ms and linger.ms.
95+
delivery.timeout.ms=120000
96+
97+
##################### Security Configuration #####################
98+
99+
# Security protocol for communication with brokers.
100+
# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
101+
#security.protocol=SASL_SSL
102+
103+
# SSL configuration.
104+
#ssl.truststore.location=/path/to/truststore.jks
105+
#ssl.truststore.password=truststore-password
106+
107+
# SASL configuration.
108+
#sasl.mechanism=PLAIN
109+
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
110+
# username="your-username" \
111+
# password="your-password";
112+
113+
##################### Performance And Throughput #####################
114+
115+
# Compression codec for all data generated.
116+
# Options: none, gzip, snappy, lz4, zstd.
117+
# Can greatly improve throughput at the cost of increased CPU usage.
118+
compression.type=none
35119

36-
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
37-
#linger.ms=
120+
# Producer will wait up to this delay to batch records together.
121+
# Higher values increase throughput but add latency.
122+
# Set to 0 for lowest latency, 5-100ms for balanced throughput/latency.
123+
linger.ms=5
38124

39-
# the maximum size of a request in bytes
40-
#max.request.size=
125+
# Default batch size in bytes when batching multiple records sent to a partition.
126+
# Larger batches improve throughput but use more memory.
127+
# 16KB is a good starting point, adjust based on message size and throughput needs.
128+
batch.size=16384
41129

42-
# the default batch size in bytes when batching multiple records sent to a partition
43-
#batch.size=
130+
# Total bytes of memory the producer can use to buffer records waiting to be sent.
131+
# Should be larger than batch.size * number of partitions you're writing to.
132+
# 32MB is reasonable for most use cases.
133+
buffer.memory=33554432
44134

45-
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
46-
#buffer.memory=
135+
# Maximum size of a request in bytes.
136+
# Should accommodate your largest batch size plus overhead.
137+
# 1MB is default and suitable for most cases.
138+
max.request.size=1048576

0 commit comments

Comments
 (0)