Skip to content

Commit 1844cbc

Browse files
authored
feat: add timestamp variable (#110)
1 parent c645d02 commit 1844cbc

File tree

31 files changed

+269
-318
lines changed

31 files changed

+269
-318
lines changed

common/src/main/resources/neo4j-sink-configuration.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ neo4j.pattern.relationship.merge-properties=Type: Boolean;\nDescription: If enab
2727
using Sink `Relationship pattern` strategy (default false)
2828
neo4j.batch-size=Type: Integer;\nDescription: Max number of messages processed per batch.
2929
neo4j.batch-timeout=Type: Duration;\nDescription: Maximum amount of time a batch is allowed to be processed.
30+
neo4j.cypher.bind-timestamp-as=Type: String;\nDescription: Under what name message timestamp will be bound in user provided Cypher statements (default __timestamp).
3031
neo4j.cypher.bind-header-as=Type: String;\nDescription: Under what name message header will be bound in user provided Cypher statements (default __header).
3132
neo4j.cypher.bind-key-as=Type: String;\nDescription: Under what name message key will be bound in user provided Cypher statements (default __key).
3233
neo4j.cypher.bind-value-as=Type: String;\nDescription: Under what name message value will be bound in user provided Cypher statements (default __value).

legacy-connectors/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,6 @@
3333
<artifactId>testing</artifactId>
3434
<scope>test</scope>
3535
</dependency>
36-
<dependency>
37-
<groupId>ch.qos.logback</groupId>
38-
<artifactId>logback-classic</artifactId>
39-
<scope>test</scope>
40-
</dependency>
4136
<dependency>
4237
<groupId>io.confluent</groupId>
4338
<artifactId>kafka-connect-avro-converter</artifactId>
@@ -53,6 +48,11 @@
5348
<artifactId>junit-jupiter</artifactId>
5449
<scope>test</scope>
5550
</dependency>
51+
<dependency>
52+
<groupId>org.slf4j</groupId>
53+
<artifactId>slf4j-simple</artifactId>
54+
<scope>test</scope>
55+
</dependency>
5656
</dependencies>
5757
<build>
5858
<plugins>

legacy-connectors/src/test/resources/logback.xml

Lines changed: 0 additions & 32 deletions
This file was deleted.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
org.slf4j.simpleLogger.defaultLogLevel=info
2+
org.slf4j.simpleLogger.showDateTime=true
3+
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd'T'HH:mm:ss.SSS
4+
org.slf4j.simpleLogger.log.streams.kafka.connect=debug
5+
org.slf4j.simpleLogger.log.org.neo4j.connectors=debug
6+
org.slf4j.simpleLogger.log.org.apache.kafka=error
7+
org.slf4j.simpleLogger.log.io.confluent.kafka=error

pom.xml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
<license-maven-plugin.version>4.5</license-maven-plugin.version>
6767
<licensing-maven-plugin.version>1.7.11</licensing-maven-plugin.version>
6868
<licensing.prepend.text>/license/neo4j_apache_v2/notice.txt</licensing.prepend.text>
69-
<logback.version>1.5.6</logback.version>
7069
<maven-clean-plugin.version>3.3.2</maven-clean-plugin.version>
7170
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
7271
<maven-dependency-plugin.version>3.6.1</maven-dependency-plugin.version>
@@ -146,11 +145,6 @@
146145
<artifactId>testing</artifactId>
147146
<version>${project.version}</version>
148147
</dependency>
149-
<dependency>
150-
<groupId>ch.qos.logback</groupId>
151-
<artifactId>logback-classic</artifactId>
152-
<version>${logback.version}</version>
153-
</dependency>
154148
<dependency>
155149
<!-- dependency of confluent protobuf converter -->
156150
<groupId>com.google.protobuf</groupId>
@@ -296,7 +290,7 @@
296290
</dependency>
297291
<dependency>
298292
<groupId>org.slf4j</groupId>
299-
<artifactId>slf4j-nop</artifactId>
293+
<artifactId>slf4j-simple</artifactId>
300294
<version>${slf4j.version}</version>
301295
</dependency>
302296
<dependency>

sink-connector/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@
2828
<artifactId>testing</artifactId>
2929
<scope>test</scope>
3030
</dependency>
31-
<dependency>
32-
<groupId>ch.qos.logback</groupId>
33-
<artifactId>logback-classic</artifactId>
34-
<scope>test</scope>
35-
</dependency>
3631
<dependency>
3732
<groupId>io.confluent</groupId>
3833
<artifactId>kafka-connect-avro-converter</artifactId>
@@ -48,6 +43,11 @@
4843
<artifactId>junit-jupiter</artifactId>
4944
<scope>test</scope>
5045
</dependency>
46+
<dependency>
47+
<groupId>org.slf4j</groupId>
48+
<artifactId>slf4j-simple</artifactId>
49+
<scope>test</scope>
50+
</dependency>
5151
</dependencies>
5252
<build>
5353
<plugins>

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jConnectorTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class Neo4jConnectorTest {
106106
connector.validate(
107107
mutableMapOf(
108108
Neo4jConfiguration.AUTHENTICATION_TYPE to "NONE",
109+
SinkConfiguration.CYPHER_BIND_TIMESTAMP_AS to "",
109110
SinkConfiguration.CYPHER_BIND_HEADER_AS to "",
110111
SinkConfiguration.CYPHER_BIND_KEY_AS to "",
111112
SinkConfiguration.CYPHER_BIND_VALUE_AS to "",
@@ -116,6 +117,7 @@ class Neo4jConnectorTest {
116117
.filter {
117118
it.name() in
118119
listOf(
120+
SinkConfiguration.CYPHER_BIND_TIMESTAMP_AS,
119121
SinkConfiguration.CYPHER_BIND_HEADER_AS,
120122
SinkConfiguration.CYPHER_BIND_KEY_AS,
121123
SinkConfiguration.CYPHER_BIND_VALUE_AS,

sink-connector/src/test/kotlin/org/neo4j/connectors/kafka/sink/Neo4jCypherIT.kt

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import io.kotest.assertions.nondeterministic.eventually
2222
import io.kotest.matchers.collections.shouldContainExactly
2323
import io.kotest.matchers.should
2424
import io.kotest.matchers.shouldBe
25+
import java.time.Instant
2526
import java.time.LocalDate
2627
import java.time.LocalDateTime
2728
import java.time.LocalTime
2829
import java.time.OffsetTime
2930
import java.time.ZoneId
3031
import java.time.ZoneOffset
3132
import java.time.ZonedDateTime
33+
import java.time.temporal.ChronoUnit
3234
import java.util.stream.Stream
3335
import kotlin.time.Duration.Companion.seconds
3436
import org.apache.kafka.connect.data.Schema
@@ -615,28 +617,37 @@ abstract class Neo4jCypherIT {
615617
CREATE (p:Person) SET
616618
p.id = __key,
617619
p.name = __value.firstName, p.surname = __value.lastName,
618-
p.createdBy = __header.createdBy
620+
p.createdBy = __header.createdBy,
621+
p.createdAt = __timestamp
619622
""")])
620623
@Test
621-
fun `should get message value from default header, key and value binding`(
624+
fun `should get message value from default timestamp, header, key and value binding`(
622625
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
623626
session: Session
624627
) = runTest {
628+
val timestamp = Instant.now().truncatedTo(ChronoUnit.MILLIS)
629+
625630
producer.publish(
626631
headers = mapOf("createdBy" to "john-doe"),
627632
keySchema = Schema.INT64_SCHEMA,
628633
key = 64L,
629634
valueSchema = Schema.STRING_SCHEMA,
630635
value =
631-
ObjectMapper().writeValueAsString(mapOf("firstName" to "john", "lastName" to "doe")))
636+
ObjectMapper().writeValueAsString(mapOf("firstName" to "john", "lastName" to "doe")),
637+
timestamp = timestamp)
632638

633639
eventually(30.seconds) { session.run("MATCH (n:Person) RETURN n", emptyMap()).single() }
634640
.get("n")
635641
.asNode() should
636642
{
637643
it.labels() shouldBe listOf("Person")
638644
it.asMap() shouldBe
639-
mapOf("id" to 64L, "name" to "john", "surname" to "doe", "createdBy" to "john-doe")
645+
mapOf(
646+
"id" to 64L,
647+
"name" to "john",
648+
"surname" to "doe",
649+
"createdBy" to "john-doe",
650+
"createdAt" to timestamp.atZone(ZoneOffset.UTC))
640651
}
641652
}
642653

@@ -649,31 +660,41 @@ abstract class Neo4jCypherIT {
649660
CREATE (p:Person) SET
650661
p.id = custom_key,
651662
p.name = custom_value.firstName, p.surname = custom_value.lastName,
652-
p.createdBy = custom_header.createdBy
663+
p.createdBy = custom_header.createdBy,
664+
p.createdAt = custom_timestamp
653665
""",
666+
bindTimestampAs = "custom_timestamp",
654667
bindHeaderAs = "custom_header",
655668
bindKeyAs = "custom_key",
656669
bindValueAs = "custom_value")])
657670
@Test
658-
fun `should get message value from custom header, key and value binding`(
671+
fun `should get message value from custom timestamp, header, key and value binding`(
659672
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
660673
session: Session
661674
) = runTest {
675+
val timestamp = Instant.now().truncatedTo(ChronoUnit.MILLIS)
676+
662677
producer.publish(
663678
headers = mapOf("createdBy" to "john-doe"),
664679
keySchema = Schema.INT64_SCHEMA,
665680
key = 64L,
666681
valueSchema = Schema.STRING_SCHEMA,
667682
value =
668-
ObjectMapper().writeValueAsString(mapOf("firstName" to "john", "lastName" to "doe")))
683+
ObjectMapper().writeValueAsString(mapOf("firstName" to "john", "lastName" to "doe")),
684+
timestamp = timestamp)
669685

670686
eventually(30.seconds) { session.run("MATCH (n:Person) RETURN n", emptyMap()).single() }
671687
.get("n")
672688
.asNode() should
673689
{
674690
it.labels() shouldBe listOf("Person")
675691
it.asMap() shouldBe
676-
mapOf("id" to 64L, "name" to "john", "surname" to "doe", "createdBy" to "john-doe")
692+
mapOf(
693+
"id" to 64L,
694+
"name" to "john",
695+
"surname" to "doe",
696+
"createdBy" to "john-doe",
697+
"createdAt" to timestamp.atZone(ZoneOffset.UTC))
677698
}
678699
}
679700

sink-connector/src/test/resources/logback.xml

Lines changed: 0 additions & 31 deletions
This file was deleted.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
org.slf4j.simpleLogger.defaultLogLevel=info
2+
org.slf4j.simpleLogger.showDateTime=true
3+
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd'T'HH:mm:ss.SSS
4+
org.slf4j.simpleLogger.log.org.neo4j.connectors=debug
5+
org.slf4j.simpleLogger.log.org.apache.kafka=error
6+
org.slf4j.simpleLogger.log.io.confluent.kafka=error

0 commit comments

Comments
 (0)