Skip to content
This repository was archived by the owner on Jun 29, 2023. It is now read-only.

Commit a5113d4

Browse files
committed
Polishing #173
Remove Kafka transport documentation from readme as transports are documented on the site. Add timeouts to Kafka send futures. Simplify tests. Remove log4j test as log4j 1.x is out of maintenance. Make fields final where possible. Add author tags, fix javadoc tags. Original pull request: #173
1 parent f8549e4 commit a5113d4

18 files changed

+197
-433
lines changed

README.md

Lines changed: 0 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -418,168 +418,7 @@ logback.xml Example:
418418
</root>
419419
</configuration>
420420
```
421-
# Kafka transport for logstash-gelf
422421

423-
424-
logstash-gelf can be used together with [Kafka](https://kafka.apache.org/) for shipping log events.
425-
426-
The URI used as connection property is a java.net.URI. The minimal URI must contain at least a host and
427-
the Fragment (Topic Name).
428-
The URL allows to specify one or more brokers but in this case you must define ports inside URL.
429-
430-
kafka://broker[:port]?[producer_properties]#[log-topic]
431-
432-
Example:
433-
434-
kafka://localhost#topic-log
435-
kafka://localhost:9092#topic-log
436-
kafka://localhost:9092,localhost:9093,localhost:9094#topic-log
437-
kafka://localhost?acks=all#topic-log
438-
kafka://localhost:19092?acks=1&max.block.ms=1000&transaction.timeout.ms=1000&request.timeout.ms=1000#kafka-log-topic
439-
440-
* scheme (fixed: Kafka, directly used to determine the to be used sender class)
441-
* host (variable: the host your Kafka broker runs on)
442-
* port (variable: the port your Kafka broker runs on)
443-
* query (variable: kafka producer config properties which is usually defined inside producer.properties file)
444-
* fragment (variable: the topic we send log messages on)
445-
446-
Sample log4J2 configuration:
447-
```xml
448-
<Configuration packages="biz.paluch.logging.gelf.log4j2">
449-
<Appenders>
450-
<Gelf name="gelf" host="kafka://localhost#kafka-log-topic" port="9093" version="1.1" extractStackTrace="true"
451-
filterStackTrace="true" mdcProfiling="true" includeFullMdc="true" maximumMessageSize="8192"
452-
originHost="%host{fqdn}" additionalFieldTypes="fieldName1=String,fieldName2=Double,fieldName3=Long">
453-
<Field name="timestamp" pattern="%d{dd MMM yyyy HH:mm:ss,SSS}" />
454-
<Field name="level" pattern="%level" />
455-
<Field name="simpleClassName" pattern="%C{1}" />
456-
<Field name="className" pattern="%C" />
457-
<Field name="server" pattern="%host" />
458-
<Field name="server.fqdn" pattern="%host{fqdn}" />
459-
460-
461-
<!-- This is a static field -->
462-
<Field name="fieldName2" literal="fieldValue2" />
463-
464-
<!-- This is a field using MDC -->
465-
<Field name="mdcField2" mdc="mdcField2" />
466-
<DynamicMdcFields regex="mdc.*" />
467-
<DynamicMdcFields regex="(mdc|MDC)fields" />
468-
</Gelf>
469-
<Console name="Console" target="SYSTEM_OUT">
470-
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
471-
</Console>
472-
</Appenders>
473-
<Loggers>
474-
<Root level="INFO">
475-
<AppenderRef ref="gelf" />
476-
<AppenderRef ref="Console"/>
477-
</Root>
478-
</Loggers>
479-
</Configuration>
480-
```
481-
Sample spring logback configuration:
482-
```xml
483-
<!DOCTYPE configuration>
484-
485-
<configuration>
486-
<include resource="org/springframework/boot/logging/logback/base.xml"/>
487-
<contextName>test</contextName>
488-
<jmxConfigurator/>
489-
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
490-
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
491-
<pattern>
492-
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
493-
</pattern>
494-
</encoder>
495-
</appender>
496-
<appender name="gelfUdp" class="biz.paluch.logging.gelf.logback.GelfLogbackAppender">
497-
<host>udp:localhost</host>
498-
<port>12201</port>
499-
<version>1.1</version>
500-
<facility>java-test</facility>
501-
<extractStackTrace>true</extractStackTrace>
502-
<filterStackTrace>true</filterStackTrace>
503-
<mdcProfiling>false</mdcProfiling>
504-
<timestampPattern>yyyy-MM-dd HH:mm:ss,SSS</timestampPattern>
505-
<maximumMessageSize>8192</maximumMessageSize>
506-
<!-- This are static fields -->
507-
<additionalFields>fieldName1=fieldValue1,fieldName2=fieldValue2</additionalFields>
508-
<!-- Optional: Specify field types -->
509-
<additionalFieldTypes>fieldName1=String,fieldName2=Double,fieldName3=Long</additionalFieldTypes>
510-
511-
<!-- This are fields using MDC -->
512-
<mdcFields>mdcField1,mdcField2</mdcFields>
513-
<dynamicMdcFields>mdc.*,(mdc|MDC)fields</dynamicMdcFields>
514-
<includeFullMdc>true</includeFullMdc>
515-
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
516-
<level>ALL</level>
517-
</filter>
518-
</appender>
519-
520-
<appender name="gelfKafka" class="biz.paluch.logging.gelf.logback.GelfLogbackAppender">
521-
<host>kafka://localhost:9092?acks=all&amp;max.block.ms=1000&amp;transaction.timeout.ms=1000&amp;request.timeout.ms=1000#kafka-log-topic</host>
522-
<port></port>
523-
<version>1.1</version>
524-
<facility>java-test</facility>
525-
<extractStackTrace>true</extractStackTrace>
526-
<filterStackTrace>true</filterStackTrace>
527-
<mdcProfiling>false</mdcProfiling>
528-
<timestampPattern>yyyy-MM-dd HH:mm:ss,SSS</timestampPattern>
529-
<maximumMessageSize>8192</maximumMessageSize>
530-
531-
<!-- This are static fields -->
532-
<additionalFields>fieldName1=fieldValue1,fieldName2=fieldValue2</additionalFields>
533-
<!-- Optional: Specify field types -->
534-
<additionalFieldTypes>fieldName1=String,fieldName2=Double,fieldName3=Long</additionalFieldTypes>
535-
536-
<!-- This are fields using MDC -->
537-
<mdcFields>mdcField1,mdcField2</mdcFields>
538-
<dynamicMdcFields>mdc.*,(mdc|MDC)fields</dynamicMdcFields>
539-
<includeFullMdc>true</includeFullMdc>
540-
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
541-
<level>INFO</level>
542-
</filter>
543-
</appender>
544-
545-
<root level="INFO">
546-
<appender-ref ref="gelfKafka" />
547-
</root>
548-
<logger name="org.apache.kafka" level="ALL" additivity="false">
549-
<appender-ref ref="gelfUdp" />
550-
<appender-ref ref="STDOUT" />
551-
</logger>
552-
<logger name="javax.management" level="ALL" additivity="false">
553-
<appender-ref ref="gelfUdp" />
554-
<appender-ref ref="STDOUT" />
555-
</logger>
556-
</configuration>
557-
```
558-
**Limitations**
559-
560-
Some configurations will be overridden or set by default:
561-
- acks (If you set it to 0 it will be set to 1 by default for log message acknowledgements) defaultValue = all
562-
- retries defaultValue=2
563-
- value.serializer ByteArraySerializer (will always override)
564-
- key.serializer ByteArraySerializer (will always override)
565-
566-
**When using with SL4J/Logback/Spring:**
567-
568-
When you are using logback/sl4j/spring you must not use kafka sender for loggers of `org.apache.kafka` and `javax.management`
569-
packages this will create a cyclic dependency for [KafkaProducer](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html) which is also using loggers from these package.
570-
You can use other logstash-gelf appender such as UDP or TCP to log these logs to Graylog.
571-
572-
You can set these loggers by changing appender-ref this way:
573-
```xml
574-
<logger name="org.apache.kafka" level="ALL" additivity="false">
575-
<appender-ref ref="gelfUdp" />
576-
<appender-ref ref="STDOUT" />
577-
</logger>
578-
<logger name="javax.management" level="ALL" additivity="false">
579-
<appender-ref ref="gelfUdp" />
580-
<appender-ref ref="STDOUT" />
581-
</logger>
582-
```
583422
Versions/Dependencies
584423
---------------------
585424
This project is built against following dependencies/versions:

pom.xml

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
<maven-resources-plugin.version>3.0.2</maven-resources-plugin.version>
8787
<maven.jacoco.version>0.7.9</maven.jacoco.version>
8888
<kafka-clients.version>2.1.0</kafka-clients.version>
89+
<kafka-junit.version>4.1.3</kafka-junit.version>
8990
</properties>
9091

9192
<developers>
@@ -392,13 +393,7 @@
392393
<version>${jackson.version}</version>
393394
<scope>test</scope>
394395
</dependency>
395-
<!-- kafka-junit dependency -->
396-
<dependency>
397-
<groupId>com.github.charithe</groupId>
398-
<artifactId>kafka-junit</artifactId>
399-
<version>4.1.3</version>
400-
<scope>test</scope>
401-
</dependency>
396+
402397

403398
<!-- Redis Client -->
404399
<dependency>
@@ -407,6 +402,7 @@
407402
<version>${jedis.version}</version>
408403
<optional>true</optional>
409404
</dependency>
405+
410406
<!-- Kafka Client -->
411407
<dependency>
412408
<groupId>org.apache.kafka</groupId>
@@ -415,6 +411,14 @@
415411
<optional>true</optional>
416412
</dependency>
417413

414+
<!-- kafka-junit dependency -->
415+
<dependency>
416+
<groupId>com.github.charithe</groupId>
417+
<artifactId>kafka-junit</artifactId>
418+
<version>${kafka-junit.version}</version>
419+
<scope>test</scope>
420+
</dependency>
421+
418422
<dependency>
419423
<groupId>org.apache.commons</groupId>
420424
<artifactId>commons-pool2</artifactId>
@@ -707,7 +711,7 @@
707711
<classpathScope>test</classpathScope>
708712
<arguments>
709713
<argument>-classpath</argument>
710-
<classpath />
714+
<classpath/>
711715
<argument>org.openjdk.jmh.Main</argument>
712716
<argument>-f</argument>
713717
<argument>1</argument>

src/main/java/biz/paluch/logging/gelf/intern/GelfSenderFactory.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
package biz.paluch.logging.gelf.intern;
22

3-
import biz.paluch.logging.gelf.intern.sender.DefaultGelfSenderProvider;
4-
import biz.paluch.logging.gelf.intern.sender.KafkaGelfSenderProvider;
5-
import biz.paluch.logging.gelf.intern.sender.RedisGelfSenderProvider;
6-
73
import java.io.IOException;
84
import java.net.SocketException;
95
import java.net.UnknownHostException;
106
import java.util.*;
117

8+
import biz.paluch.logging.gelf.intern.sender.DefaultGelfSenderProvider;
9+
import biz.paluch.logging.gelf.intern.sender.KafkaGelfSenderProvider;
10+
import biz.paluch.logging.gelf.intern.sender.RedisGelfSenderProvider;
11+
1212
/**
1313
* Factory to create a {@link GelfSender} based on the host and protocol details. This factory uses Java's {@link ServiceLoader}
1414
* mechanism to discover classes implementing {@link GelfSenderProvider}.
15-
*
15+
*
1616
* @author Mark Paluch
1717
* @author Aleksandar Stojadinovic
18+
* @author Rifat Döver
1819
* @since 26.09.13 15:12
1920
*/
2021
public final class GelfSenderFactory {
2122

2223
/**
2324
* Create a GelfSender based on the configuration.
24-
*
25+
*
2526
* @param hostAndPortProvider the host and port
2627
* @param errorReporter the error reporter
2728
* @param senderSpecificConfigurations configuration map
@@ -58,7 +59,7 @@ public Map<String, Object> getSpecificConfigurations() {
5859

5960
/**
6061
* Create a GelfSender based on the configuration.
61-
*
62+
*
6263
* @param senderConfiguration the configuration
6364
* @return a new {@link GelfSender} instance
6465
*/

src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaContants.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22

33
/**
44
* @author Rifat Döver
5+
* @since 1.13
56
*/
67
public class KafkaContants {
8+
79
public static final String KAFKA_SCHEME = "kafka";
8-
public static final String KAFKA_LOG_TOPIC = "kafka.log.topic";
10+
11+
private KafkaContants() {
12+
}
913
}

src/main/java/biz/paluch/logging/gelf/intern/sender/KafkaGelfSender.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
11
package biz.paluch.logging.gelf.intern.sender;
22

3-
import biz.paluch.logging.gelf.intern.ErrorReporter;
4-
import biz.paluch.logging.gelf.intern.GelfMessage;
5-
import biz.paluch.logging.gelf.intern.GelfSender;
3+
import java.util.concurrent.Future;
4+
import java.util.concurrent.TimeUnit;
5+
66
import org.apache.kafka.clients.producer.KafkaProducer;
77
import org.apache.kafka.clients.producer.ProducerRecord;
88
import org.apache.kafka.clients.producer.RecordMetadata;
99

10-
import java.util.concurrent.Future;
10+
import biz.paluch.logging.gelf.intern.ErrorReporter;
11+
import biz.paluch.logging.gelf.intern.GelfMessage;
12+
import biz.paluch.logging.gelf.intern.GelfSender;
1113

1214
/**
13-
* Gelf sender for kafka. This sender uses Kafka.
15+
* {@link GelfSender} using Kafka.
1416
*
1517
* @author Rifat Döver
1618
* @since 1.13
1719
*/
1820
public class KafkaGelfSender implements GelfSender {
19-
private KafkaProducer<byte[], byte[]> kafkaProducer;
20-
private String topicName;
21-
private ErrorReporter errorReporter;
21+
22+
private final KafkaProducer<byte[], byte[]> kafkaProducer;
23+
private final String topicName;
24+
private final ErrorReporter errorReporter;
2225

2326
public KafkaGelfSender(KafkaProducer<byte[], byte[]> kafkaProducer, String topicName, ErrorReporter errorReporter) {
2427
this.kafkaProducer = kafkaProducer;
@@ -29,15 +32,15 @@ public KafkaGelfSender(KafkaProducer<byte[], byte[]> kafkaProducer, String topic
2932
@Override
3033
public boolean sendMessage(GelfMessage message) {
3134
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, message.toJson().getBytes());
32-
boolean hasOfset;
35+
boolean hasOffset;
3336
try {
3437
Future<RecordMetadata> metadata = kafkaProducer.send(record);
35-
hasOfset = metadata.get().hasOffset();
38+
hasOffset = metadata.get(30, TimeUnit.SECONDS).hasOffset();
3639
} catch (Exception e) {
3740
errorReporter.reportError("Error sending log to kafka", e);
3841
return false;
3942
}
40-
return hasOfset;
43+
return hasOffset;
4144
}
4245

4346
@Override

0 commit comments

Comments
 (0)