Skip to content

Commit fa043d1

Browse files
author
Rifat Döver
committed
Changes for review mp911de#173 (comment)
1 parent fddc3d4 commit fa043d1

20 files changed

+230
-353
lines changed

src/main/java/biz/paluch/logging/gelf/PropertyProvider.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,5 @@ public interface PropertyProvider {
4040
String PROPERTY_LEVEL = "level";
4141
String PROPERTY_VERSION = "version";
4242

43-
String PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG = "kafkaProducerProperties";
44-
String PROPERTY_KAFKA_LOG_TOPIC = "kafkaLogTopic";
45-
4643
String getProperty(String propertyName);
4744
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package biz.paluch.logging.gelf.intern.sender;
2+
3+
/**
4+
* @author Rifat Döver
5+
*/
6+
public class KafkaContants {
7+
public static final String KAFKA_SCHEME = "kafka";
8+
public static final String KAFKA_LOG_TOPIC = "kafka.log.topic";
9+
}
Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,38 @@
11
package biz.paluch.logging.gelf.intern.sender;
22

3-
import biz.paluch.logging.gelf.PropertyProvider;
43
import biz.paluch.logging.gelf.intern.GelfSender;
54
import biz.paluch.logging.gelf.intern.GelfSenderConfiguration;
65
import biz.paluch.logging.gelf.intern.GelfSenderProvider;
76
import org.apache.kafka.clients.producer.KafkaProducer;
87
import org.apache.kafka.clients.producer.ProducerConfig;
98
import org.apache.kafka.common.serialization.ByteArraySerializer;
109

11-
import java.io.FileInputStream;
1210
import java.io.IOException;
1311
import java.util.Properties;
1412

13+
1514
/**
1615
* @author Rifat Döver
1716
* @since 1.13
1817
*
1918
*/
2019
public class KafkaGelfSenderProvider implements GelfSenderProvider {
21-
public static final String KAFKA_SCHEME = "kafka";
2220
@Override
2321
public boolean supports(String host) {
24-
return host != null && host.startsWith(KAFKA_SCHEME + ":");
22+
return host != null && host.startsWith(KafkaContants.KAFKA_SCHEME + ":");
2523
}
2624

2725
@Override
2826
public GelfSender create(GelfSenderConfiguration configuration) throws IOException {
29-
Properties props = new Properties();
30-
String logTopic = (String) configuration.getSpecificConfigurations().get(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC);
31-
32-
if(logTopic == null || logTopic.isEmpty())
33-
throw new IllegalArgumentException(
34-
String.format("Log topic must be specified. Please specify it with %s property",
35-
PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC)
36-
);
37-
38-
String location = (String) configuration.getSpecificConfigurations()
39-
.get(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG);
40-
if(location != null && !location.isEmpty())
41-
{
42-
props.load(new FileInputStream(location));
43-
}
27+
Properties props = QueryStringParser.parseKafkaHost(configuration.getHost());
28+
String kafkaLogTopic = props.getProperty(KafkaContants.KAFKA_LOG_TOPIC);
4429

30+
// Removing property for non standart warning.
31+
props.remove(KafkaContants.KAFKA_LOG_TOPIC);
4532
//Default Configurations
4633
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
4734
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
4835

49-
if(!props.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
50-
{
51-
String kafkaBrokerUrls = getKafkaBrokers(String.format("%s:%d", configuration.getHost(), configuration.getPort()));
52-
if(kafkaBrokerUrls.isEmpty()) {
53-
throw new IllegalArgumentException("Kafka brokers url must not be empty");
54-
}
55-
56-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrls);
57-
}
5836
if(!props.containsKey(ProducerConfig.ACKS_CONFIG)) {
5937
props.put(ProducerConfig.ACKS_CONFIG,"all");
6038
} else {
@@ -66,13 +44,6 @@ public GelfSender create(GelfSenderConfiguration configuration) throws IOExcepti
6644
props.put(ProducerConfig.RETRIES_CONFIG,2);
6745

6846
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[], byte[]>(props);
69-
return new KafkaGelfSender(kafkaProducer, logTopic , configuration.getErrorReporter());
70-
}
71-
72-
private String getKafkaBrokers(String host) {
73-
if(host !=null && !host.isEmpty() && host.startsWith(KAFKA_SCHEME + ":")){
74-
return host.substring(6);
75-
}
76-
return "";
47+
return new KafkaGelfSender(kafkaProducer, kafkaLogTopic , configuration.getErrorReporter());
7748
}
7849
}

src/main/java/biz/paluch/logging/gelf/intern/sender/QueryStringParser.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package biz.paluch.logging.gelf.intern.sender;
22

3+
import org.apache.kafka.clients.producer.ProducerConfig;
4+
import org.apache.logging.log4j.util.Strings;
5+
36
import java.net.URI;
4-
import java.util.Collections;
5-
import java.util.HashMap;
6-
import java.util.Map;
7-
import java.util.StringTokenizer;
7+
import java.util.*;
88
import java.util.concurrent.TimeUnit;
99

1010
/**
@@ -119,4 +119,47 @@ public static String getHost(URI uri) {
119119
}
120120
return host;
121121
}
122+
123+
public static Properties parseKafkaHost(String host) {
124+
Properties properties = new Properties();
125+
String[] slices = host.split("[|]");
126+
if(slices.length > 3 || slices.length < 2) throwIllegalHostArgument(host);
127+
String kafkaBrokers = getKafkaBrokers(slices[0]);
128+
129+
//Parse brokers
130+
if(Strings.isEmpty(kafkaBrokers)) throwIllegalHostArgument(host);
131+
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
132+
133+
//Put log topic
134+
if(Strings.isEmpty(slices[1])) throwIllegalHostArgument(host);
135+
properties.put(KafkaContants.KAFKA_LOG_TOPIC, slices[1]);
136+
137+
//Put other properties
138+
if(slices.length > 2) {
139+
StringTokenizer st = new StringTokenizer(slices[2], "&;");
140+
while (st.hasMoreTokens()) {
141+
String queryParam = st.nextToken();
142+
int equalsIndex = queryParam.indexOf('=');
143+
if (equalsIndex != -1) {
144+
String key = queryParam.substring(0, equalsIndex);
145+
String value = queryParam.substring(equalsIndex + 1);
146+
147+
properties.put(key.toLowerCase(), value);
148+
}
149+
}
150+
}
151+
return properties;
152+
153+
}
154+
155+
private static String getKafkaBrokers(String host) {
156+
if(host !=null && !host.isEmpty() && host.startsWith(KafkaContants.KAFKA_SCHEME + ":")){
157+
return host.substring(6);
158+
}
159+
return "";
160+
}
161+
private static void throwIllegalHostArgument(String host) {
162+
throw new IllegalArgumentException(String.format("Provided host: %s invalid for format \n" +
163+
"kafka:[kafka_brokers]|[kafka_log_topic]|[options_as_query_parameters]",host));
164+
}
122165
}

src/main/java/biz/paluch/logging/gelf/jul/GelfLogHandler.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package biz.paluch.logging.gelf.jul;
22

3+
import static biz.paluch.logging.gelf.LogMessageField.NamedLogField.*;
4+
5+
import java.util.Collections;
6+
import java.util.logging.*;
7+
38
import biz.paluch.logging.RuntimeContainer;
49
import biz.paluch.logging.gelf.GelfMessageAssembler;
510
import biz.paluch.logging.gelf.LogMessageField;
611
import biz.paluch.logging.gelf.PropertyProvider;
712
import biz.paluch.logging.gelf.intern.*;
813

9-
import java.util.HashMap;
10-
import java.util.Map;
11-
import java.util.logging.*;
12-
13-
import static biz.paluch.logging.gelf.LogMessageField.NamedLogField.*;
14-
1514
/**
1615
* Logging-Handler for GELF (Graylog Extended Logging Format). This Java-Util-Logging Handler creates GELF Messages and posts
1716
* them using UDP (default) or TCP. Following parameters are supported/needed:
@@ -48,7 +47,6 @@ public class GelfLogHandler extends Handler implements ErrorReporter {
4847
protected volatile GelfSender gelfSender;
4948
protected GelfMessageAssembler gelfMessageAssembler;
5049
private final ErrorReporter errorReporter = new MessagePostprocessingErrorReporter(this);
51-
private Map <String,Object> senderSpecificConfiguration = new HashMap<String, Object>();
5250

5351
public GelfLogHandler() {
5452
super();
@@ -92,16 +90,6 @@ public GelfLogHandler() {
9290
} catch (final Exception e) {
9391
// ignore
9492
}
95-
96-
String kafkaLogTopic = propertyProvider.getProperty(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC);
97-
if (null != kafkaLogTopic) {
98-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC,kafkaLogTopic);
99-
}
100-
101-
String kafkaProviderProperties = propertyProvider.getProperty(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG);
102-
if (null != kafkaProviderProperties) {
103-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG,kafkaProviderProperties);
104-
}
10593
}
10694

10795
protected void initializeDefaultFields() {
@@ -152,7 +140,7 @@ public void publish(final LogRecord record) {
152140
}
153141

154142
protected GelfSender createGelfSender() {
155-
return GelfSenderFactory.createSender(gelfMessageAssembler, errorReporter, senderSpecificConfiguration);
143+
return GelfSenderFactory.createSender(gelfMessageAssembler, errorReporter, Collections.EMPTY_MAP);
156144
}
157145

158146
@Override

src/main/java/biz/paluch/logging/gelf/log4j/GelfLogAppender.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -263,20 +263,4 @@ public String getVersion() {
263263
public void setVersion(String version) {
264264
gelfMessageAssembler.setVersion(version);
265265
}
266-
267-
public String getKafkaLogTopic() {
268-
return (String) senderSpecificConfiguration.get(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC);
269-
}
270-
271-
public void setKafkaLogTopic(String kafkaLogTopic) {
272-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC,kafkaLogTopic);
273-
}
274-
275-
public String getKafkaProducerProperties() {
276-
return (String) senderSpecificConfiguration.get(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG);
277-
}
278-
279-
public void setKafkaProducerProperties(String kafkaProducerProperties) {
280-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG,kafkaProducerProperties);
281-
}
282266
}

src/main/java/biz/paluch/logging/gelf/log4j2/GelfLogAppender.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package biz.paluch.logging.gelf.log4j2;
22

3-
import biz.paluch.logging.RuntimeContainer;
4-
import biz.paluch.logging.gelf.*;
5-
import biz.paluch.logging.gelf.intern.*;
3+
import static biz.paluch.logging.gelf.LogMessageField.NamedLogField.*;
4+
import static org.apache.logging.log4j.core.layout.PatternLayout.newBuilder;
5+
6+
import java.util.Collections;
7+
68
import org.apache.logging.log4j.Logger;
79
import org.apache.logging.log4j.core.Filter;
810
import org.apache.logging.log4j.core.LogEvent;
@@ -15,11 +17,9 @@
1517
import org.apache.logging.log4j.status.StatusLogger;
1618
import org.apache.logging.log4j.util.Strings;
1719

18-
import java.util.HashMap;
19-
import java.util.Map;
20-
21-
import static biz.paluch.logging.gelf.LogMessageField.NamedLogField.*;
22-
import static org.apache.logging.log4j.core.layout.PatternLayout.newBuilder;
20+
import biz.paluch.logging.RuntimeContainer;
21+
import biz.paluch.logging.gelf.*;
22+
import biz.paluch.logging.gelf.intern.*;
2323

2424
/**
2525
* Logging-Handler for GELF (Graylog Extended Logging Format). This Java-Util-Logging Handler creates GELF Messages and posts
@@ -185,13 +185,11 @@ public void reportError(String message, Exception e) {
185185
protected GelfSender gelfSender;
186186
private final MdcGelfMessageAssembler gelfMessageAssembler;
187187
private final ErrorReporter errorReporter;
188-
private final Map<String, Object> senderSpecificConfiguration;
189188

190-
public GelfLogAppender(String name, Filter filter, MdcGelfMessageAssembler gelfMessageAssembler, boolean ignoreExceptions, Map<String, Object> senderSpecificConfiguration) {
189+
public GelfLogAppender(String name, Filter filter, MdcGelfMessageAssembler gelfMessageAssembler, boolean ignoreExceptions) {
191190

192191
super(name, filter, null, ignoreExceptions);
193192
this.gelfMessageAssembler = gelfMessageAssembler;
194-
this.senderSpecificConfiguration = senderSpecificConfiguration;
195193

196194
ErrorReporter errorReporter = getErrorReporter(ignoreExceptions);
197195

@@ -215,14 +213,11 @@ public static GelfLogAppender createAppender(@PluginConfiguration final Configur
215213
@PluginAttribute("mdcProfiling") String mdcProfiling,
216214
@PluginAttribute("maximumMessageSize") String maximumMessageSize,
217215
@PluginAttribute("additionalFieldTypes") String additionalFieldTypes,
218-
@PluginAttribute("kafkaLogTopic") String kafkaLogTopic,
219-
@PluginAttribute("kafkaProducerProperties") String kafkaProducerProperties,
220216
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) boolean ignoreExceptions) {
221217

222218
RuntimeContainer.initialize(ERROR_REPORTER);
223219

224220
MdcGelfMessageAssembler mdcGelfMessageAssembler = new MdcGelfMessageAssembler();
225-
Map<String, Object> senderSpecificConfiguration = new HashMap<String, Object>();
226221

227222
if (name == null) {
228223
LOGGER.error("No name provided for " + GelfLogAppender.class.getSimpleName());
@@ -234,14 +229,6 @@ public static GelfLogAppender createAppender(@PluginConfiguration final Configur
234229
return null;
235230
}
236231

237-
if (Strings.isNotEmpty(kafkaLogTopic)) {
238-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC,kafkaLogTopic);
239-
}
240-
241-
if (Strings.isNotEmpty(kafkaProducerProperties)) {
242-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG,kafkaProducerProperties);
243-
}
244-
245232
if (Strings.isNotEmpty(host)) {
246233
mdcGelfMessageAssembler.setHost(host);
247234
}
@@ -299,7 +286,7 @@ public static GelfLogAppender createAppender(@PluginConfiguration final Configur
299286

300287
configureFields(mdcGelfMessageAssembler, fields, dynamicFieldArray);
301288

302-
return new GelfLogAppender(name, filter, mdcGelfMessageAssembler, ignoreExceptions, senderSpecificConfiguration);
289+
return new GelfLogAppender(name, filter, mdcGelfMessageAssembler, ignoreExceptions);
303290
}
304291

305292
/**
@@ -390,6 +377,6 @@ public void start() {
390377
}
391378

392379
protected GelfSender createGelfSender() {
393-
return GelfSenderFactory.createSender(gelfMessageAssembler, errorReporter, senderSpecificConfiguration);
380+
return GelfSenderFactory.createSender(gelfMessageAssembler, errorReporter, Collections.EMPTY_MAP);
394381
}
395382
}

src/main/java/biz/paluch/logging/gelf/logback/GelfLogbackAppender.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
package biz.paluch.logging.gelf.logback;
22

3+
import static biz.paluch.logging.gelf.LogMessageField.NamedLogField.*;
4+
5+
import java.util.Collections;
6+
37
import biz.paluch.logging.RuntimeContainer;
48
import biz.paluch.logging.gelf.LogMessageField;
59
import biz.paluch.logging.gelf.MdcGelfMessageAssembler;
6-
import biz.paluch.logging.gelf.PropertyProvider;
710
import biz.paluch.logging.gelf.intern.*;
811
import ch.qos.logback.classic.spi.ILoggingEvent;
912
import ch.qos.logback.core.AppenderBase;
1013

11-
import java.util.HashMap;
12-
import java.util.Map;
13-
14-
import static biz.paluch.logging.gelf.LogMessageField.NamedLogField.*;
15-
1614
/**
1715
* Logging-Handler for GELF (Graylog Extended Logging Format). This Logback Handler creates GELF Messages and posts them using
1816
* UDP (default) or TCP. Following parameters are supported/needed:
@@ -69,7 +67,6 @@ public class GelfLogbackAppender extends AppenderBase<ILoggingEvent> implements
6967
protected GelfSender gelfSender;
7068
protected MdcGelfMessageAssembler gelfMessageAssembler;
7169
private final ErrorReporter errorReporter = new MessagePostprocessingErrorReporter(this);
72-
private final Map<String, Object> senderSpecificConfiguration = new HashMap<String, Object>();
7370

7471
public GelfLogbackAppender() {
7572
super();
@@ -123,7 +120,7 @@ public void stop() {
123120
}
124121

125122
protected GelfSender createGelfSender() {
126-
return GelfSenderFactory.createSender(gelfMessageAssembler, errorReporter, senderSpecificConfiguration);
123+
return GelfSenderFactory.createSender(gelfMessageAssembler, errorReporter, Collections.EMPTY_MAP);
127124
}
128125

129126
@Override
@@ -262,21 +259,4 @@ public String getVersion() {
262259
public void setVersion(String version) {
263260
gelfMessageAssembler.setVersion(version);
264261
}
265-
266-
public String getKafkaLogTopic() {
267-
return (String) senderSpecificConfiguration.get(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC);
268-
}
269-
270-
public void setKafkaLogTopic(String kafkaLogTopic) {
271-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_LOG_TOPIC,kafkaLogTopic);
272-
}
273-
274-
public String getKafkaProducerProperties() {
275-
return (String) senderSpecificConfiguration.get(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG);
276-
}
277-
278-
public void setKafkaProducerProperties(String kafkaProducerProperties) {
279-
senderSpecificConfiguration.put(PropertyProvider.PROPERTY_KAFKA_PRODUCER_PROPERTIES_CONFIG,kafkaProducerProperties);
280-
}
281-
282262
}

0 commit comments

Comments
 (0)