Skip to content
This repository was archived by the owner on Mar 30, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ plugins {
id 'eclipse'
id 'idea'
id 'jacoco'
id 'org.sonarqube' version '2.7.1'
id 'org.sonarqube' version '2.8'
id 'checkstyle'
id 'org.ajoberstar.grgit' version '3.1.1'
id "io.spring.dependency-management" version "1.0.8.RELEASE"
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
}

description = 'Spring Integration Kafka Support'
Expand Down Expand Up @@ -60,7 +60,8 @@ ext {
junitJupiterVersion = '5.5.2'
log4jVersion = '2.12.1'
springIntegrationVersion = '5.2.1.RELEASE'
springKafkaVersion = '2.3.2.RELEASE'
springIntegrationKotlinVersion = '0.0.2.RELEASE'
springKafkaVersion = '2.3.3.RELEASE'

idPrefix = 'kafka'

Expand Down Expand Up @@ -103,19 +104,20 @@ dependencies {

testCompile "org.springframework.kafka:spring-kafka-test:$springKafkaVersion"
testCompile 'org.springframework.integration:spring-integration-test'
testCompile "org.springframework.integration:spring-integration-kotlin-dsl:$springIntegrationKotlinVersion"
testCompile "com.willowtreeapps.assertk:assertk-jvm:$assertkVersion"
testCompile 'org.jetbrains.kotlin:kotlin-reflect'
testCompile 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
testCompile 'org.junit.jupiter:junit-jupiter-api'
testRuntime 'org.junit.jupiter:junit-jupiter-engine'
testRuntime 'org.junit.platform:junit-platform-launcher'

// To avoid compiler warnings about @API annotations in JUnit code
testCompileOnly 'org.apiguardian:apiguardian-api:1.0.0'

testRuntime 'com.fasterxml.jackson.core:jackson-core'
testRuntime 'com.fasterxml.jackson.core:jackson-databind'
testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
testRuntime 'org.junit.jupiter:junit-jupiter-engine'
testRuntime 'org.junit.platform:junit-platform-launcher'
}

// enable all compiler warnings; individual projects may customize further
Expand All @@ -134,7 +136,7 @@ test {

checkstyle {
configFile = new File(rootDir, 'src/checkstyle/checkstyle.xml')
toolVersion = '8.24'
toolVersion = '8.25'
}

task updateCopyrights {
Expand Down Expand Up @@ -179,19 +181,19 @@ jacocoTestReport {
build.dependsOn jacocoTestReport

task sourcesJar(type: Jar) {
classifier = 'sources'
archiveClassifier = 'sources'
from sourceSets.main.allJava
}

task javadocJar(type: Jar) {
classifier = 'javadoc'
archiveClassifier = 'javadoc'
from javadoc
}

jar {
manifest {
attributes(
'Implementation-Version': version,
'Implementation-Version': archiveClassifier,
'Created-By': "JDK ${System.properties['java.version']} (${System.properties['java.specification.vendor']})",
'Implementation-Title': project.name,
'Implementation-Vendor-Id': project.group,
Expand Down Expand Up @@ -235,8 +237,8 @@ task api(type: Javadoc) {

task schemaZip(type: Zip) {
group = 'Distribution'
classifier = 'schema'
description = "Builds -${classifier} archive containing all " +
archiveClassifier = 'schema'
description = "Builds -${archiveClassifier} archive containing all " +
"XSDs for deployment at static.springframework.org/schema."

def Properties schemas = new Properties();
Expand All @@ -260,8 +262,8 @@ task schemaZip(type: Zip) {

task docsZip(type: Zip) {
group = 'Distribution'
classifier = 'docs'
description = "Builds -${classifier} archive containing api " +
archiveClassifier = 'docs'
description = "Builds -${archiveClassifier} archive containing api " +
"for deployment at static.spring.io/spring-integration/docs."

from('src/dist') {
Expand All @@ -275,8 +277,8 @@ task docsZip(type: Zip) {

task distZip(type: Zip, dependsOn: [docsZip, schemaZip]) {
group = 'Distribution'
classifier = 'dist'
description = "Builds -${classifier} archive, containing all jars and docs, " +
archiveClassifier = 'dist'
description = "Builds -${archiveClassifier} archive, containing all jars and docs, " +
"suitable for community download page."

ext.baseDir = "${project.name}-${project.version}";
Expand All @@ -288,11 +290,11 @@ task distZip(type: Zip, dependsOn: [docsZip, schemaZip]) {
into "${baseDir}"
}

from(zipTree(docsZip.archivePath)) {
from(zipTree(docsZip.archiveFile)) {
into "${baseDir}/docs"
}

from(zipTree(schemaZip.archivePath)) {
from(zipTree(schemaZip.archiveFile)) {
into "${baseDir}/schema"
}

Expand All @@ -307,9 +309,9 @@ task distZip(type: Zip, dependsOn: [docsZip, schemaZip]) {
// Not published by default; only for use when building from source.
task depsZip(type: Zip, dependsOn: distZip) { zipTask ->
group = 'Distribution'
classifier = 'dist-with-deps'
description = "Builds -${classifier} archive, containing everything " +
"in the -${distZip.classifier} archive plus all dependencies."
archiveClassifier = 'dist-with-deps'
description = "Builds -${archiveClassifier} archive, containing everything " +
"in the -${distZip.archiveClassifier} archive plus all dependencies."

from zipTree(distZip.archivePath)

Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
4 changes: 1 addition & 3 deletions src/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@
<property name="scope" value="package"/>
<property name="authorFormat" value=".+\s.+"/>
</module>
<module name="JavadocMethod">
<property name="allowMissingJavadoc" value="true"/>
</module>
<module name="JavadocMethod"/>
<module name="JavadocVariable">
<property name="scope" value="public"/>
</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor
import org.springframework.integration.MessageRejectedException
import org.springframework.integration.channel.QueueChannel
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.dsl.Pollers
import org.springframework.integration.dsl.integrationFlow
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler
Expand Down Expand Up @@ -87,7 +86,7 @@ import java.util.stream.Stream
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = [KafkaDslKotlinTests.TEST_TOPIC1, KafkaDslKotlinTests.TEST_TOPIC2,
KafkaDslKotlinTests.TEST_TOPIC3, KafkaDslKotlinTests.TEST_TOPIC4, KafkaDslKotlinTests.TEST_TOPIC5])
KafkaDslKotlinTests.TEST_TOPIC3, KafkaDslKotlinTests.TEST_TOPIC4, KafkaDslKotlinTests.TEST_TOPIC5])
class KafkaDslKotlinTests {

companion object {
Expand Down Expand Up @@ -159,8 +158,8 @@ class KafkaDslKotlinTests {
assertThat(receive!!.payload).isEqualTo("FOO")
val headers = receive.headers
assertThat(headers.containsKey(KafkaHeaders.ACKNOWLEDGMENT)).isTrue()
val acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)
acknowledgment?.acknowledge()
val acknowledgment = headers[KafkaHeaders.ACKNOWLEDGMENT] as Acknowledgment
acknowledgment.acknowledge()
assertThat(headers[KafkaHeaders.RECEIVED_TOPIC]).isEqualTo(TEST_TOPIC1)
assertThat(headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]).isEqualTo(i + 1)
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION_ID]).isEqualTo(0)
Expand All @@ -176,8 +175,8 @@ class KafkaDslKotlinTests {
assertThat(receive!!.payload).isEqualTo("FOO")
val headers = receive.headers
assertThat(headers.containsKey(KafkaHeaders.ACKNOWLEDGMENT)).isTrue()
val acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)
acknowledgment?.acknowledge()
val acknowledgment = headers[KafkaHeaders.ACKNOWLEDGMENT] as Acknowledgment
acknowledgment.acknowledge()
assertThat(headers[KafkaHeaders.RECEIVED_TOPIC]).isEqualTo(TEST_TOPIC2)
assertThat(headers[KafkaHeaders.RECEIVED_MESSAGE_KEY]).isEqualTo(i + 1)
assertThat(headers[KafkaHeaders.RECEIVED_PARTITION_ID]).isEqualTo(0)
Expand Down Expand Up @@ -232,7 +231,7 @@ class KafkaDslKotlinTests {
@Bean
fun consumerFactory(): ConsumerFactory<Int, String> {
val props = KafkaTestUtils.consumerProps("test1", "false", this.embeddedKafka)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
return DefaultKafkaConsumerFactory(props)
}

Expand All @@ -241,7 +240,7 @@ class KafkaDslKotlinTests {

@Bean
fun topic1ListenerFromKafkaFlow() =
IntegrationFlows.from(
integrationFlow(
Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer {
Expand All @@ -251,55 +250,52 @@ class KafkaDslKotlinTests {
.recoveryCallback(ErrorMessageSendingRecoverer(errorChannel(),
RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(RetryTemplate())
.filterInRetry(true))
.filter(Message::class.java, { m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer::class.java)!! < 101 },
{ f -> f.throwExceptionOnRejection(true) })
.transform<String, String> { it.toUpperCase() }
.channel { c -> c.queue("listeningFromKafkaResults1") }
.get()
.filterInRetry(true)) {
filter<Message<*>>({ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 }) { f -> f.throwExceptionOnRejection(true) }
transform<String, String>({ it.toUpperCase() })
channel { c -> c.queue("listeningFromKafkaResults1") }
}

@Bean
fun topic2ListenerFromKafkaFlow() =
IntegrationFlows.from(
integrationFlow(
Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC2)
.configureListenerContainer { it.ackMode(ContainerProperties.AckMode.MANUAL) }
.recoveryCallback(ErrorMessageSendingRecoverer(errorChannel(),
RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(RetryTemplate())
.filterInRetry(true))
.filter(Message::class.java,
{ m -> m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer::class.java)!! < 101 },
{ it.throwExceptionOnRejection(true) })
.transform<String, String> { it.toUpperCase() }
.channel { c -> c.queue("listeningFromKafkaResults2") }
.get()
.filterInRetry(true)) {
filter<Message<*>>({ m -> (m.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] as Int) < 101 }) { it.throwExceptionOnRejection(true) }
transform<String, String>({ it.toUpperCase() })
channel { c -> c.queue("listeningFromKafkaResults2") }
}

@Bean
fun producerFactory(): DefaultKafkaProducerFactory<Int, String> {
val props = KafkaTestUtils.producerProps(this.embeddedKafka)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props[ProducerConfig.MAX_BLOCK_MS_CONFIG] = "10000"
return DefaultKafkaProducerFactory(props)
}

@Bean
fun sendToKafkaFlow() =
IntegrationFlow { f ->
f.split<String>({ p -> Stream.generate { p }.limit(101) }, null)
.publishSubscribeChannel { c ->
c.subscribe { sf ->
sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')")
integrationFlow {
split<String>({ p -> Stream.generate { p }.limit(101) })
publishSubscribeChannel {
it
.subscribe {
it.handle(kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')")
) { it.id("kafkaProducer1") }
}
.subscribe { sf ->
sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp<Any> { 1487694048644L }
) { it.id("kafkaProducer2") }
}
}
.subscribe {
it.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp<Any> { 1487694048644L }
) { it.id("kafkaProducer2") }
}
}
}

@Bean
Expand All @@ -310,21 +306,20 @@ class KafkaDslKotlinTests {
.messageKey<Any> { m -> m.headers[IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER] }
.headerMapper(mapper())
.sync(true)
.partitionId<Any> { _ -> 0 }
.partitionId<Any> { 0 }
.topicExpression("headers[kafka_topic] ?: '$topic'")
.configureKafkaTemplate { t -> t.id("kafkaTemplate:$topic") }
.configureKafkaTemplate { it.id("kafkaTemplate:$topic") }


@Bean
fun sourceFlow() =
IntegrationFlows
.from(Kafka.inboundChannelAdapter(consumerFactory(), ConsumerProperties(TEST_TOPIC3)))
{ e -> e.poller(Pollers.fixedDelay(100)) }
.handle { p ->
this.fromSource = p.getPayload()
this.sourceFlowLatch.countDown()
}
.get()
integrationFlow(Kafka.inboundChannelAdapter(consumerFactory(), ConsumerProperties(TEST_TOPIC3)),
{ e -> e.poller(Pollers.fixedDelay(100)) }) {
handle { m ->
this@ContextConfiguration.fromSource = m.payload
this@ContextConfiguration.sourceFlowLatch.countDown()
}
}

@Bean
fun replyingKafkaTemplate() =
Expand All @@ -335,10 +330,10 @@ class KafkaDslKotlinTests {

@Bean
fun outboundGateFlow() =
IntegrationFlows.from(Gate::class.java)
.handle(Kafka.outboundGateway(replyingKafkaTemplate())
.sync(true))
.get()
integrationFlow<Gate> {
handle(Kafka.outboundGateway(replyingKafkaTemplate())
.sync(true))
}

private fun replyContainer(): GenericMessageListenerContainer<Int, String> {
val containerProperties = ContainerProperties(TEST_TOPIC5)
Expand All @@ -359,10 +354,9 @@ class KafkaDslKotlinTests {

@Bean
fun serverGateway() =
IntegrationFlows.from(
Kafka.inboundGateway(consumerFactory(), containerProperties(), producerFactory()))
.transform<String, String> { it.toUpperCase() }
.get()
integrationFlow(Kafka.inboundGateway(consumerFactory(), containerProperties(), producerFactory())) {
transform<String, String>({ it.toUpperCase() })
}

private fun containerProperties() =
ContainerProperties(TEST_TOPIC4)
Expand Down