Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
retries: 5

zookeeper:
image: confluentinc/cp-zookeeper:6.1.13
image: confluentinc/cp-zookeeper:7.0.11
hostname: zookeeper
container_name: zookeeper
ports:
Expand All @@ -38,7 +38,7 @@ services:
retries: 5

broker:
image: confluentinc/cp-server:6.1.13
image: confluentinc/cp-server:7.0.11
hostname: broker
container_name: broker
depends_on:
Expand Down Expand Up @@ -74,7 +74,7 @@ services:
retries: 5

schema-registry:
image: confluentinc/cp-schema-registry:6.1.13
image: confluentinc/cp-schema-registry:7.0.11
hostname: schema-registry
container_name: schema-registry
depends_on:
Expand All @@ -94,7 +94,7 @@ services:
retries: 5

connect:
image: confluentinc/cp-server-connect:6.1.13
image: confluentinc/cp-server-connect:7.0.11
hostname: connect
container_name: connect
depends_on:
Expand Down Expand Up @@ -138,7 +138,7 @@ services:
retries: 5

control-center:
image: confluentinc/cp-enterprise-control-center:6.1.13
image: confluentinc/cp-enterprise-control-center:7.0.11
hostname: control-center
container_name: control-center
depends_on:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.sink

import io.confluent.connect.avro.AvroData
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.neo4j.connectors.kafka.testing.sink.LegacyNeo4jSink
import org.neo4j.connectors.kafka.testing.sink.TopicProducer
import org.neo4j.driver.Session

class LegacyNeo4jSinkIT {

companion object {
const val TOPIC = "persons"
}

@LegacyNeo4jSink(
topics = [TOPIC],
queries =
[
"MERGE (p:Person {name: event.name, surname: event.surname, executionId: event.executionId})"])
@Test
fun `writes messages to Neo4j via legacy sink connector`(
@TopicProducer producer: KafkaProducer<String, GenericRecord>,
session: Session,
testInfo: TestInfo
) {
val executionId = testInfo.displayName + System.currentTimeMillis()
val avroRecord =
GenericData.Record(
AvroData(20)
.fromConnectSchema(
SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("surname", Schema.STRING_SCHEMA)
.field("executionId", Schema.STRING_SCHEMA)
.build()))
avroRecord.put("name", "Jane")
avroRecord.put("surname", "Doe")
avroRecord.put("executionId", executionId)
val record = ProducerRecord<String, GenericRecord>(TOPIC, avroRecord)

producer.send(record)

await().atMost(30.seconds.toJavaDuration()).until {
session
.run(
"MATCH (p:Person {name: \$name, surname: \$surname, executionId: \$executionId}) RETURN count(p) = 1 AS result",
mapOf("name" to "Jane", "surname" to "Doe", "executionId" to executionId))
.single()["result"]
.asBoolean()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Neo4jSinkIT {
[
"MERGE (p:Person {name: event.name, surname: event.surname, executionId: event.executionId})"])
@Test
fun `writes messages to Neo4j`(
fun `writes messages to Neo4j via sink connector`(
@TopicProducer producer: KafkaProducer<String, GenericRecord>,
session: Session,
testInfo: TestInfo
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.source

import java.time.Duration
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
import org.neo4j.connectors.kafka.testing.assertions.TopicVerifier
import org.neo4j.connectors.kafka.testing.source.LegacyNeo4jSource
import org.neo4j.connectors.kafka.testing.source.TopicConsumer
import org.neo4j.driver.Session

class LegacyNeo4jSourceIT {

companion object {
const val TOPIC = "neo4j-source-topic"
}

@LegacyNeo4jSource(
topic = TOPIC,
streamingProperty = "timestamp",
streamingFrom = "ALL",
streamingQuery =
"MATCH (ts:TestSource) WHERE ts.timestamp > \$lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp, ts.execId AS execId")
@Test
fun `reads latest changes from legacy Neo4j source`(
testInfo: TestInfo,
@TopicConsumer(topic = TOPIC, offset = "earliest")
consumer: KafkaConsumer<String, GenericRecord>,
session: Session
) {
val executionId = testInfo.displayName + System.currentTimeMillis()

session
.run(
"CREATE (:TestSource {name: 'jane', surname: 'doe', timestamp: datetime().epochMillis, execId: \$execId})",
mapOf("execId" to executionId))
.consume()
session
.run(
"CREATE (:TestSource {name: 'john', surname: 'doe', timestamp: datetime().epochMillis, execId: \$execId})",
mapOf("execId" to executionId))
.consume()
session
.run(
"CREATE (:TestSource {name: 'mary', surname: 'doe', timestamp: datetime().epochMillis, execId: \$execId})",
mapOf("execId" to executionId))
.consume()

TopicVerifier.create(consumer)
.expectMessageValueMatching { value ->
value.asMap().excludingKeys("timestamp") ==
mapOf("name" to "jane", "surname" to "doe", "execId" to executionId)
}
.expectMessageValueMatching { value ->
value.asMap().excludingKeys("timestamp") ==
mapOf("name" to "john", "surname" to "doe", "execId" to executionId)
}
.expectMessageValueMatching { value ->
value.asMap().excludingKeys("timestamp") ==
mapOf("name" to "mary", "surname" to "doe", "execId" to executionId)
}
.verifyWithin(Duration.ofSeconds(30))
}
}

fun GenericRecord.asMap(): Map<String, String> {
// FIXME: properly convert values
return this.schema.fields.associate { field -> field.name() to this.get(field.name()).toString() }
}

/**
* Filters out all specified keys from map
*
* @throws IllegalArgumentException if any of the specified keys are not part of this map set of
* keys
*/
fun <K, V> Map<K, V>.excludingKeys(vararg keys: K): Map<K, V> {
val missing = keys.filter { !this.keys.contains(it) }
if (missing.isNotEmpty()) {
throw IllegalArgumentException(
"Cannot exclude keys ${missing.joinToString()}: they are missing from map $this")
}
val exclusions = setOf(*keys)
return this.filterKeys { !exclusions.contains(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class Neo4jSourceIT {
@Neo4jSource(
topic = TOPIC,
streamingProperty = "timestamp",
streamingFrom = "ALL",
streamingQuery =
startFrom = "EARLIEST",
query =
"MATCH (ts:TestSource) WHERE ts.timestamp > \$lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp, ts.execId AS execId")
@Test
fun `reads latest changes from Neo4j source`(
Expand Down Expand Up @@ -79,24 +79,3 @@ class Neo4jSourceIT {
.verifyWithin(Duration.ofSeconds(30))
}
}

fun GenericRecord.asMap(): Map<String, String> {
// FIXME: properly convert values
return this.schema.fields.associate { field -> field.name() to this.get(field.name()).toString() }
}

/**
* Filters out all specified keys from map
*
* @throws IllegalArgumentException if any of the specified keys are not part of this map set of
* keys
*/
fun <K, V> Map<K, V>.excludingKeys(vararg keys: K): Map<K, V> {
val missing = keys.filter { !this.keys.contains(it) }
if (missing.isNotEmpty()) {
throw IllegalArgumentException(
"Cannot exclude keys ${missing.joinToString()}: they are missing from map $this")
}
val exclusions = setOf(*keys)
return this.filterKeys { !exclusions.contains(it) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.testing.sink

import org.junit.jupiter.api.extension.ExtendWith
import org.neo4j.connectors.kafka.testing.DEFAULT_TO_ENV

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
@ExtendWith(LegacyNeo4jSinkExtension::class)
annotation class LegacyNeo4jSink(
val brokerExternalHost: String = DEFAULT_TO_ENV,
val kafkaConnectExternalUri: String = DEFAULT_TO_ENV,
val schemaControlRegistryUri: String = DEFAULT_TO_ENV,
val schemaControlRegistryExternalUri: String = DEFAULT_TO_ENV,
val neo4jUri: String = DEFAULT_TO_ENV,
val neo4jExternalUri: String = DEFAULT_TO_ENV,
val neo4jUser: String = DEFAULT_TO_ENV,
val neo4jPassword: String = DEFAULT_TO_ENV,
val topics: Array<String>,
val queries: Array<String>
)
Loading