Skip to content
This repository was archived by the owner on Jun 29, 2023. It is now read-only.

Commit f8549e4

Browse files
Rifat Dövermp911de
authored andcommitted
Gelf Kafka sender implementation #173
Log events can now be shipped using Kafka. Appenders can configure a Kafka URL in the host field according to the scheme: kafka://broker[:port]?[producer_properties]#[log-topic] e.g. kafka://localhost#topic-log Kafka uses internally logging and JMX components with logging so these logging categories should be excluded to prevent circular, recursive log calls. Original pull request: #173.
1 parent ee613f5 commit f8549e4

20 files changed

+945
-10
lines changed

README.md

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,168 @@ logback.xml Example:
418418
</root>
419419
</configuration>
420420
```
421+
# Kafka transport for logstash-gelf
421422

423+
424+
logstash-gelf can be used together with [Kafka](https://kafka.apache.org/) for shipping log events.
425+
426+
The URI used as connection property is a java.net.URI. The minimal URI must contain at least a host and
427+
the Fragment (Topic Name).
428+
The URL allows to specify one or more brokers but in this case you must define ports inside URL.
429+
430+
kafka://broker[:port]?[producer_properties]#[log-topic]
431+
432+
Example:
433+
434+
kafka://localhost#topic-log
435+
kafka://localhost:9092#topic-log
436+
kafka://localhost:9092,localhost:9093,localhost:9094#topic-log
437+
kafka://localhost?acks=all#topic-log
438+
kafka://localhost:19092?acks=1&max.block.ms=1000&transaction.timeout.ms=1000&request.timeout.ms=1000#kafka-log-topic
439+
440+
* scheme (fixed: Kafka, directly used to determine the to be used sender class)
441+
* host (variable: the host your Kafka broker runs on)
442+
* port (variable: the port your Kafka broker runs on)
443+
* query (variable: kafka producer config properties which is usually defined inside producer.properties file)
444+
* fragment (variable: the topic we send log messages on)
445+
446+
Sample log4J2 configuration:
447+
```xml
448+
<Configuration packages="biz.paluch.logging.gelf.log4j2">
449+
<Appenders>
450+
<Gelf name="gelf" host="kafka://localhost#kafka-log-topic" port="9093" version="1.1" extractStackTrace="true"
451+
filterStackTrace="true" mdcProfiling="true" includeFullMdc="true" maximumMessageSize="8192"
452+
originHost="%host{fqdn}" additionalFieldTypes="fieldName1=String,fieldName2=Double,fieldName3=Long">
453+
<Field name="timestamp" pattern="%d{dd MMM yyyy HH:mm:ss,SSS}" />
454+
<Field name="level" pattern="%level" />
455+
<Field name="simpleClassName" pattern="%C{1}" />
456+
<Field name="className" pattern="%C" />
457+
<Field name="server" pattern="%host" />
458+
<Field name="server.fqdn" pattern="%host{fqdn}" />
459+
460+
461+
<!-- This is a static field -->
462+
<Field name="fieldName2" literal="fieldValue2" />
463+
464+
<!-- This is a field using MDC -->
465+
<Field name="mdcField2" mdc="mdcField2" />
466+
<DynamicMdcFields regex="mdc.*" />
467+
<DynamicMdcFields regex="(mdc|MDC)fields" />
468+
</Gelf>
469+
<Console name="Console" target="SYSTEM_OUT">
470+
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
471+
</Console>
472+
</Appenders>
473+
<Loggers>
474+
<Root level="INFO">
475+
<AppenderRef ref="gelf" />
476+
<AppenderRef ref="Console"/>
477+
</Root>
478+
</Loggers>
479+
</Configuration>
480+
```
481+
Sample spring logback configuration:
482+
```xml
483+
<!DOCTYPE configuration>
484+
485+
<configuration>
486+
<include resource="org/springframework/boot/logging/logback/base.xml"/>
487+
<contextName>test</contextName>
488+
<jmxConfigurator/>
489+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
490+
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
491+
<pattern>
492+
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
493+
</pattern>
494+
</encoder>
495+
</appender>
496+
<appender name="gelfUdp" class="biz.paluch.logging.gelf.logback.GelfLogbackAppender">
497+
<host>udp:localhost</host>
498+
<port>12201</port>
499+
<version>1.1</version>
500+
<facility>java-test</facility>
501+
<extractStackTrace>true</extractStackTrace>
502+
<filterStackTrace>true</filterStackTrace>
503+
<mdcProfiling>false</mdcProfiling>
504+
<timestampPattern>yyyy-MM-dd HH:mm:ss,SSS</timestampPattern>
505+
<maximumMessageSize>8192</maximumMessageSize>
506+
<!-- This are static fields -->
507+
<additionalFields>fieldName1=fieldValue1,fieldName2=fieldValue2</additionalFields>
508+
<!-- Optional: Specify field types -->
509+
<additionalFieldTypes>fieldName1=String,fieldName2=Double,fieldName3=Long</additionalFieldTypes>
510+
511+
<!-- This are fields using MDC -->
512+
<mdcFields>mdcField1,mdcField2</mdcFields>
513+
<dynamicMdcFields>mdc.*,(mdc|MDC)fields</dynamicMdcFields>
514+
<includeFullMdc>true</includeFullMdc>
515+
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
516+
<level>ALL</level>
517+
</filter>
518+
</appender>
519+
520+
<appender name="gelfKafka" class="biz.paluch.logging.gelf.logback.GelfLogbackAppender">
521+
<host>kafka://localhost:9092?acks=all&amp;max.block.ms=1000&amp;transaction.timeout.ms=1000&amp;request.timeout.ms=1000#kafka-log-topic</host>
522+
<port></port>
523+
<version>1.1</version>
524+
<facility>java-test</facility>
525+
<extractStackTrace>true</extractStackTrace>
526+
<filterStackTrace>true</filterStackTrace>
527+
<mdcProfiling>false</mdcProfiling>
528+
<timestampPattern>yyyy-MM-dd HH:mm:ss,SSS</timestampPattern>
529+
<maximumMessageSize>8192</maximumMessageSize>
530+
531+
<!-- This are static fields -->
532+
<additionalFields>fieldName1=fieldValue1,fieldName2=fieldValue2</additionalFields>
533+
<!-- Optional: Specify field types -->
534+
<additionalFieldTypes>fieldName1=String,fieldName2=Double,fieldName3=Long</additionalFieldTypes>
535+
536+
<!-- This are fields using MDC -->
537+
<mdcFields>mdcField1,mdcField2</mdcFields>
538+
<dynamicMdcFields>mdc.*,(mdc|MDC)fields</dynamicMdcFields>
539+
<includeFullMdc>true</includeFullMdc>
540+
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
541+
<level>INFO</level>
542+
</filter>
543+
</appender>
544+
545+
<root level="INFO">
546+
<appender-ref ref="gelfKafka" />
547+
</root>
548+
<logger name="org.apache.kafka" level="ALL" additivity="false">
549+
<appender-ref ref="gelfUdp" />
550+
<appender-ref ref="STDOUT" />
551+
</logger>
552+
<logger name="javax.management" level="ALL" additivity="false">
553+
<appender-ref ref="gelfUdp" />
554+
<appender-ref ref="STDOUT" />
555+
</logger>
556+
</configuration>
557+
```
558+
**Limitations**
559+
560+
Some configurations will be overridden or set by default:
561+
- acks (If you set it to 0 it will be set to 1 by default for log message acknowledgements) defaultValue = all
562+
- retries defaultValue=2
563+
- value.serializer ByteArraySerializer (will always override)
564+
- key.serializer ByteArraySerializer (will always override)
565+
566+
**When using with SL4J/Logback/Spring:**
567+
568+
When you are using logback/sl4j/spring you must not use kafka sender for loggers of `org.apache.kafka` and `javax.management`
569+
packages this will create a cyclic dependency for [KafkaProducer](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html) which is also using loggers from these package.
570+
You can use other logstash-gelf appender such as UDP or TCP to log these logs to Graylog.
571+
572+
You can set these loggers by changing appender-ref this way:
573+
```xml
574+
<logger name="org.apache.kafka" level="ALL" additivity="false">
575+
<appender-ref ref="gelfUdp" />
576+
<appender-ref ref="STDOUT" />
577+
</logger>
578+
<logger name="javax.management" level="ALL" additivity="false">
579+
<appender-ref ref="gelfUdp" />
580+
<appender-ref ref="STDOUT" />
581+
</logger>
582+
```
422583
Versions/Dependencies
423584
---------------------
424585
This project is built against following dependencies/versions:
@@ -427,8 +588,9 @@ This project is built against following dependencies/versions:
427588
* log4j2 2.9.1
428589
* Java Util Logging JDK Version 1.6
429590
* logback 1.1.3
430-
* slf4j-api 1.7.13
591+
* slf4j-api 1.7.25
431592
* jedis 2.9.0 (includes commons-pool2 2.4.3)
593+
* kafka-clients 2.1.0
432594

433595
License
434596
-------

pom.xml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
<jackson.version>2.9.8</jackson.version>
6464
<jedis.version>3.0.1</jedis.version>
6565
<jedis-commons-pool2.version>2.4.3</jedis-commons-pool2.version>
66-
<slf4j-api.version>1.7.13</slf4j-api.version>
66+
<slf4j-api.version>1.7.25</slf4j-api.version>
6767
<logback-classic.version>1.1.3</logback-classic.version>
6868
<org.jboss.logmanager.version>1.5.2.Final</org.jboss.logmanager.version>
6969
<arquillian.version>1.4.0.Final</arquillian.version>
@@ -85,6 +85,7 @@
8585
<maven-scm-provider-gitexe.version>1.9.5</maven-scm-provider-gitexe.version>
8686
<maven-resources-plugin.version>3.0.2</maven-resources-plugin.version>
8787
<maven.jacoco.version>0.7.9</maven.jacoco.version>
88+
<kafka-clients.version>2.1.0</kafka-clients.version>
8889
</properties>
8990

9091
<developers>
@@ -391,6 +392,13 @@
391392
<version>${jackson.version}</version>
392393
<scope>test</scope>
393394
</dependency>
395+
<!-- kafka-junit dependency -->
396+
<dependency>
397+
<groupId>com.github.charithe</groupId>
398+
<artifactId>kafka-junit</artifactId>
399+
<version>4.1.3</version>
400+
<scope>test</scope>
401+
</dependency>
394402

395403
<!-- Redis Client -->
396404
<dependency>
@@ -399,6 +407,13 @@
399407
<version>${jedis.version}</version>
400408
<optional>true</optional>
401409
</dependency>
410+
<!-- Kafka Client -->
411+
<dependency>
412+
<groupId>org.apache.kafka</groupId>
413+
<artifactId>kafka-clients</artifactId>
414+
<version>${kafka-clients.version}</version>
415+
<optional>true</optional>
416+
</dependency>
402417

403418
<dependency>
404419
<groupId>org.apache.commons</groupId>

src/main/java/biz/paluch/logging/gelf/intern/GelfSenderFactory.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
package biz.paluch.logging.gelf.intern;
22

3+
import biz.paluch.logging.gelf.intern.sender.DefaultGelfSenderProvider;
4+
import biz.paluch.logging.gelf.intern.sender.KafkaGelfSenderProvider;
5+
import biz.paluch.logging.gelf.intern.sender.RedisGelfSenderProvider;
6+
37
import java.io.IOException;
48
import java.net.SocketException;
59
import java.net.UnknownHostException;
6-
import java.util.ArrayList;
7-
import java.util.Iterator;
8-
import java.util.List;
9-
import java.util.Map;
10-
import java.util.ServiceLoader;
11-
12-
import biz.paluch.logging.gelf.intern.sender.DefaultGelfSenderProvider;
13-
import biz.paluch.logging.gelf.intern.sender.RedisGelfSenderProvider;
10+
import java.util.*;
1411

1512
/**
1613
* Factory to create a {@link GelfSender} based on the host and protocol details. This factory uses Java's {@link ServiceLoader}
@@ -118,6 +115,7 @@ private static class SenderProviderHolder {
118115
providerList.add(iter.next());
119116
}
120117
providerList.add(new RedisGelfSenderProvider());
118+
providerList.add(new KafkaGelfSenderProvider());
121119
providerList.add(new DefaultGelfSenderProvider());
122120
}
123121

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package biz.paluch.logging.gelf.intern.sender;
2+
3+
/**
4+
* @author Rifat Döver
5+
*/
6+
public class KafkaContants {
7+
public static final String KAFKA_SCHEME = "kafka";
8+
public static final String KAFKA_LOG_TOPIC = "kafka.log.topic";
9+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package biz.paluch.logging.gelf.intern.sender;
2+
3+
import biz.paluch.logging.gelf.intern.ErrorReporter;
4+
import biz.paluch.logging.gelf.intern.GelfMessage;
5+
import biz.paluch.logging.gelf.intern.GelfSender;
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerRecord;
8+
import org.apache.kafka.clients.producer.RecordMetadata;
9+
10+
import java.util.concurrent.Future;
11+
12+
/**
13+
* Gelf sender for kafka. This sender uses Kafka.
14+
*
15+
* @author Rifat Döver
16+
* @since 1.13
17+
*/
18+
public class KafkaGelfSender implements GelfSender {
19+
private KafkaProducer<byte[], byte[]> kafkaProducer;
20+
private String topicName;
21+
private ErrorReporter errorReporter;
22+
23+
public KafkaGelfSender(KafkaProducer<byte[], byte[]> kafkaProducer, String topicName, ErrorReporter errorReporter) {
24+
this.kafkaProducer = kafkaProducer;
25+
this.topicName = topicName;
26+
this.errorReporter = errorReporter;
27+
}
28+
29+
@Override
30+
public boolean sendMessage(GelfMessage message) {
31+
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, message.toJson().getBytes());
32+
boolean hasOfset;
33+
try {
34+
Future<RecordMetadata> metadata = kafkaProducer.send(record);
35+
hasOfset = metadata.get().hasOffset();
36+
} catch (Exception e) {
37+
errorReporter.reportError("Error sending log to kafka", e);
38+
return false;
39+
}
40+
return hasOfset;
41+
}
42+
43+
@Override
44+
public void close() {
45+
kafkaProducer.close();
46+
}
47+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package biz.paluch.logging.gelf.intern.sender;
2+
3+
import biz.paluch.logging.gelf.intern.GelfSender;
4+
import biz.paluch.logging.gelf.intern.GelfSenderConfiguration;
5+
import biz.paluch.logging.gelf.intern.GelfSenderProvider;
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.common.serialization.ByteArraySerializer;
9+
import org.apache.logging.log4j.util.Strings;
10+
11+
import java.io.IOException;
12+
import java.net.URI;
13+
import java.util.Map;
14+
import java.util.Properties;
15+
16+
/**
17+
* @author Rifat Döver
18+
* @since 1.13
19+
*
20+
*/
21+
public class KafkaGelfSenderProvider implements GelfSenderProvider {
22+
private static final int BROKER_DEFAULT_PORT = 9092;
23+
24+
@Override
25+
public boolean supports(String host) {
26+
return host != null && host.startsWith(KafkaContants.KAFKA_SCHEME + ":");
27+
}
28+
29+
@Override
30+
public GelfSender create(GelfSenderConfiguration configuration) throws IOException {
31+
URI uri = URI.create(configuration.getHost());
32+
Map<String, String> options = QueryStringParser.parse(uri);
33+
34+
Properties props = new Properties();
35+
for (String key : options.keySet()) {
36+
props.setProperty(key, options.get(key));
37+
}
38+
39+
String brokers = getBrokerServers(configuration);
40+
41+
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
42+
43+
String kafkaLogTopic = getTopic(uri);
44+
45+
// Default Configurations
46+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
47+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
48+
49+
if (!props.containsKey(ProducerConfig.ACKS_CONFIG)) {
50+
props.put(ProducerConfig.ACKS_CONFIG, "all");
51+
} else {
52+
String acks = props.getProperty(ProducerConfig.ACKS_CONFIG);
53+
acks = acks.equalsIgnoreCase("0") ? "1" : acks;
54+
props.put(ProducerConfig.ACKS_CONFIG, acks);
55+
}
56+
if (!props.containsKey(ProducerConfig.RETRIES_CONFIG))
57+
props.put(ProducerConfig.RETRIES_CONFIG, 2);
58+
59+
KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[], byte[]>(props);
60+
return new KafkaGelfSender(kafkaProducer, kafkaLogTopic, configuration.getErrorReporter());
61+
}
62+
63+
private String getBrokerServers(GelfSenderConfiguration configuration) {
64+
URI uri = URI.create(configuration.getHost());
65+
String brokers;
66+
if (uri.getHost() != null) {
67+
brokers = uri.getHost();
68+
int port = uri.getPort() > 0 ? uri.getPort()
69+
: configuration.getPort() > 0 ? configuration.getPort() : BROKER_DEFAULT_PORT;
70+
brokers += ":" + port;
71+
72+
} else {
73+
brokers = uri.getAuthority();
74+
}
75+
if (Strings.isEmpty(brokers))
76+
throw new IllegalArgumentException("Kafka URI must specify bootstrap.servers.");
77+
return brokers;
78+
}
79+
80+
private String getTopic(URI uri) {
81+
if (Strings.isEmpty(uri.getFragment()))
82+
throw new IllegalArgumentException("Kafka URI must specify log topic as fragment.");
83+
return uri.getFragment();
84+
}
85+
}

0 commit comments

Comments
 (0)