Skip to content

Commit c485157

Browse files
committed
feat: implement sink connector registration
1 parent b60a86d commit c485157

File tree

10 files changed

+423
-77
lines changed

10 files changed

+423
-77
lines changed

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/EnvBackedSetting.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
*/
1717
package org.neo4j.connectors.kafka.testing
1818

19+
import org.neo4j.connectors.kafka.testing.WordSupport.camelCaseToUpperSnakeCase
1920
import org.neo4j.connectors.kafka.testing.source.DEFAULT_TO_ENV
2021

2122
internal class EnvBackedSetting<T : Annotation>(
2223
private val name: String,
23-
private val envVarName: String,
2424
private val getter: (T) -> String,
2525
private val envAccessor: (String) -> String? = System::getenv,
2626
) {
27+
private val envVarName = camelCaseToUpperSnakeCase(name)
2728

2829
fun isValid(annotation: T): Boolean {
2930
return getter(annotation) != DEFAULT_TO_ENV || envAccessor(envVarName) != null
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.testing
18+
19+
import kotlin.math.absoluteValue
20+
21+
object WordSupport {
22+
23+
// implementation note: this is a naive pluralization helper, which is not meant to work outside
24+
// English
25+
fun pluralize(count: Int, singular: String, plural: String): String {
26+
if (count.absoluteValue < 2) {
27+
return singular
28+
}
29+
return plural
30+
}
31+
32+
fun camelCaseToUpperSnakeCase(word: String): String {
33+
fun convertChar(char: Char): List<Char> {
34+
return if (Character.isUpperCase(char)) {
35+
listOf('_', char)
36+
} else listOf(char.uppercaseChar())
37+
}
38+
39+
return word.flatMap { convertChar(it) }.joinToString("")
40+
}
41+
}

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSink.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.neo4j.connectors.kafka.testing.source.DEFAULT_TO_ENV
2323
@Retention(AnnotationRetention.RUNTIME)
2424
@ExtendWith(Neo4jSinkExtension::class)
2525
annotation class Neo4jSink(
26+
val kafkaConnectExternalUri: String = DEFAULT_TO_ENV,
2627
val neo4jUri: String = DEFAULT_TO_ENV,
2728
val neo4jUser: String = DEFAULT_TO_ENV,
2829
val neo4jPassword: String = DEFAULT_TO_ENV,

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtension.kt

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,27 @@ import org.junit.jupiter.api.extension.ExtensionConfigurationException
2424
import org.junit.jupiter.api.extension.ExtensionContext
2525
import org.neo4j.connectors.kafka.testing.AnnotationSupport
2626
import org.neo4j.connectors.kafka.testing.EnvBackedSetting
27+
import org.neo4j.connectors.kafka.testing.WordSupport.pluralize
2728

28-
internal class Neo4jSinkExtension(
29+
/*nocommit*/ class Neo4jSinkExtension(
2930
// visible for testing
3031
envAccessor: (String) -> String? = System::getenv
3132
) : ExecutionCondition, BeforeEachCallback, AfterEachCallback {
3233

3334
private lateinit var sinkAnnotation: Neo4jSink
35+
private lateinit var sink: Neo4jSinkRegistration
3436

35-
private val neo4jUri =
36-
EnvBackedSetting<Neo4jSink>("neo4jUri", "NEO4J_URI", { it.neo4jUri }, envAccessor)
37-
private val neo4jUser =
38-
EnvBackedSetting<Neo4jSink>("neo4jUser", "NEO4J_USER", { it.neo4jUser }, envAccessor)
39-
private val neo4jPassword =
37+
private val kafkaConnectExternalUri =
4038
EnvBackedSetting<Neo4jSink>(
41-
"neo4jPassword", "NEO4J_PASSWORD", { it.neo4jPassword }, envAccessor)
39+
"kafkaConnectExternalUri", { it.kafkaConnectExternalUri }, envAccessor)
40+
private val neo4jUri = EnvBackedSetting<Neo4jSink>("neo4jUri", { it.neo4jUri }, envAccessor)
41+
private val neo4jUser = EnvBackedSetting<Neo4jSink>("neo4jUser", { it.neo4jUser }, envAccessor)
42+
private val neo4jPassword =
43+
EnvBackedSetting<Neo4jSink>("neo4jPassword", { it.neo4jPassword }, envAccessor)
4244

4345
private val envSettings =
4446
listOf(
47+
kafkaConnectExternalUri,
4548
neo4jUri,
4649
neo4jUser,
4750
neo4jPassword,
@@ -58,6 +61,11 @@ internal class Neo4jSinkExtension(
5861
errors.add(it.errorMessage())
5962
}
6063
}
64+
val topicCount = metadata.topics.size
65+
if (topicCount != metadata.queries.size) {
66+
errors.add(
67+
"Expected $topicCount ${pluralize(topicCount, "query", "queries")}, but got ${metadata.queries.size}. There must be as many topics as queries defined.")
68+
}
6169
if (errors.isNotEmpty()) {
6270
throw ExtensionConfigurationException(
6371
"\nMissing settings, see details below:\n\t${errors.joinToString("\n\t")}",
@@ -68,11 +76,17 @@ internal class Neo4jSinkExtension(
6876
return ConditionEvaluationResult.enabled("@Neo4jSink and environment properly configured")
6977
}
7078

71-
override fun beforeEach(p0: ExtensionContext?) {
72-
TODO("Not yet implemented")
79+
override fun beforeEach(extensionContext: ExtensionContext?) {
80+
sink =
81+
Neo4jSinkRegistration(
82+
neo4jUri = neo4jUri.read(sinkAnnotation),
83+
neo4jUser = neo4jUser.read(sinkAnnotation),
84+
neo4jPassword = neo4jPassword.read(sinkAnnotation),
85+
topicQuerys = sinkAnnotation.topics.zip(sinkAnnotation.queries).toMap())
86+
sink.register(kafkaConnectExternalUri.read(sinkAnnotation))
7387
}
7488

7589
override fun afterEach(p0: ExtensionContext?) {
76-
TODO("Not yet implemented")
90+
sink.unregister()
7791
}
7892
}

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkRegistration.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.neo4j.connectors.kafka.testing.RegistrationSupport.randomizedName
2626
class Neo4jSinkRegistration(
2727
topicQuerys: Map<String, String>,
2828
neo4jUri: String,
29-
neo4jUsername: String,
29+
neo4jUser: String,
3030
neo4jPassword: String,
3131
enableKeySchemas: Boolean = false,
3232
enableValueSchemas: Boolean = false,
@@ -59,7 +59,7 @@ class Neo4jSinkRegistration(
5959
"errors.log.enable" to enableErrorLog,
6060
"errors.log.include.messages" to includeMessagesInErrorLog,
6161
"neo4j.server.uri" to neo4jUri,
62-
"neo4j.authentication.basic.username" to neo4jUsername,
62+
"neo4j.authentication.basic.username" to neo4jUser,
6363
"neo4j.authentication.basic.password" to neo4jPassword,
6464
))
6565
.nestUnder("config", queries)
@@ -77,7 +77,7 @@ class Neo4jSinkRegistration(
7777
RegistrationSupport.unregisterConnector(URI("$connectBaseUri/connectors/$name/"))
7878
}
7979

80-
internal fun getPayload(): Map<String, Any> {
80+
/*nocommit*/ fun getPayload(): Map<String, Any> {
8181
return payload
8282
}
8383
}

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,28 +59,19 @@ internal class Neo4jSourceExtension(
5959
private lateinit var session: Session
6060

6161
private val brokerExternalHost =
62-
EnvBackedSetting<Neo4jSource>(
63-
"brokerExternalHost", "BROKER_EXTERNAL_HOST", { it.brokerExternalHost })
62+
EnvBackedSetting<Neo4jSource>("brokerExternalHost", { it.brokerExternalHost })
6463
private val schemaRegistryUri =
65-
EnvBackedSetting<Neo4jSource>(
66-
"schemaControlRegistryUri",
67-
"SCHEMA_CONTROL_REGISTRY_URI",
68-
{ it.schemaControlRegistryUri })
64+
EnvBackedSetting<Neo4jSource>("schemaControlRegistryUri", { it.schemaControlRegistryUri })
6965
private val schemaRegistryExternalUri =
7066
EnvBackedSetting<Neo4jSource>(
71-
"schemaControlRegistryExternalUri",
72-
"SCHEMA_CONTROL_REGISTRY_EXTERNAL_URI",
73-
{ it.schemaControlRegistryExternalUri })
67+
"schemaControlRegistryExternalUri", { it.schemaControlRegistryExternalUri })
7468
private val kafkaConnectExternalUri =
75-
EnvBackedSetting<Neo4jSource>(
76-
"kafkaConnectExternalUri", "KAFKA_CONNECT_EXTERNAL_URI", { it.kafkaConnectExternalUri })
77-
private val neo4jUri = EnvBackedSetting<Neo4jSource>("neo4jUri", "NEO4J_URI", { it.neo4jUri })
69+
EnvBackedSetting<Neo4jSource>("kafkaConnectExternalUri", { it.kafkaConnectExternalUri })
70+
private val neo4jUri = EnvBackedSetting<Neo4jSource>("neo4jUri", { it.neo4jUri })
7871
private val neo4jExternalUri =
79-
EnvBackedSetting<Neo4jSource>(
80-
"neo4jExternalUri", "NEO4J_EXTERNAL_URI", { it.neo4jExternalUri })
81-
private val neo4jUser = EnvBackedSetting<Neo4jSource>("neo4jUser", "NEO4J_USER", { it.neo4jUser })
82-
private val neo4jPassword =
83-
EnvBackedSetting<Neo4jSource>("neo4jPassword", "NEO4J_PASSWORD", { it.neo4jPassword })
72+
EnvBackedSetting<Neo4jSource>("neo4jExternalUri", { it.neo4jExternalUri })
73+
private val neo4jUser = EnvBackedSetting<Neo4jSource>("neo4jUser", { it.neo4jUser })
74+
private val neo4jPassword = EnvBackedSetting<Neo4jSource>("neo4jPassword", { it.neo4jPassword })
8475

8576
private val envSettings =
8677
listOf(
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.testing
18+
19+
import com.sun.net.httpserver.HttpExchange
20+
import com.sun.net.httpserver.HttpServer
21+
import java.net.InetSocketAddress
22+
23+
class KafkaConnectServer : AutoCloseable {
24+
25+
companion object {
26+
val CANDIDATE_ACCEPT_HEADERS = setOf("*/*", "application/*", "*/json", "application/json")
27+
}
28+
29+
private var started = false
30+
private val httpServer: HttpServer = HttpServer.create(InetSocketAddress(0), 0)
31+
32+
/**
33+
* Starts the fake Kafka Connect server
34+
*
35+
* @param registrationHandler an optional handler to act on the ongoing HTTP exchange for
36+
* connector registration
37+
* @param unregistrationHandler an optional handler to act on the ongoing HTTP exchange for
38+
* connector registration Note that both handlers should return true if
39+
* HttpExchange.sendResponseHeaders is called and false otherwise
40+
*/
41+
fun start(
42+
registrationHandler: (HttpExchange) -> Boolean = {
43+
it.sendResponseHeaders(201, -1)
44+
true
45+
},
46+
unregistrationHandler: (HttpExchange) -> Boolean = {
47+
it.sendResponseHeaders(204, -1)
48+
true
49+
}
50+
) {
51+
52+
httpServer.createContext("/connectors") { exchange ->
53+
val path = exchange.requestURI.path.replaceFirst("/connectors/?([^/]*)/?".toRegex(), "$1")
54+
55+
when {
56+
path.isEmpty() -> handleRegistration(exchange, registrationHandler)
57+
!path.contains("/") -> handleUnregistration(exchange, unregistrationHandler)
58+
else -> exchange.sendResponseHeaders(404, 0)
59+
}
60+
exchange.close()
61+
}
62+
this.started = true
63+
this.httpServer.start()
64+
}
65+
66+
fun address(): String {
67+
return "http://localhost:${httpServer.address.port}"
68+
}
69+
70+
override fun close() {
71+
if (this.started) {
72+
this.started = false
73+
httpServer.stop(0)
74+
}
75+
}
76+
77+
private fun handleRegistration(exchange: HttpExchange, handler: (HttpExchange) -> Boolean) {
78+
if (!validateMethod(exchange, "POST") ||
79+
!validateJsonIn(exchange) ||
80+
!validateJsonOut(exchange)) {
81+
return
82+
}
83+
if (!handler(exchange)) {
84+
exchange.sendResponseHeaders(201, -1)
85+
}
86+
}
87+
88+
private fun handleUnregistration(exchange: HttpExchange, handler: (HttpExchange) -> Boolean) {
89+
if (!validateMethod(exchange, "DELETE") || !validateJsonOut(exchange)) {
90+
return
91+
}
92+
if (!handler(exchange)) {
93+
exchange.sendResponseHeaders(204, -1)
94+
}
95+
}
96+
97+
private fun validateMethod(exchange: HttpExchange, method: String): Boolean {
98+
if (exchange.requestMethod != method) {
99+
exchange.sendResponseHeaders(405, -1)
100+
return false
101+
}
102+
return true
103+
}
104+
105+
private fun validateJsonIn(exchange: HttpExchange): Boolean {
106+
val contentType = exchange.requestHeaders["Content-Type"]!!
107+
if (contentType.size != 1) {
108+
exchange.sendResponseHeaders(400, -1)
109+
exchange.responseBody.bufferedWriter().write("unparseable Content-Type")
110+
return false
111+
}
112+
if (contentType[0] != "application/json") {
113+
exchange.sendResponseHeaders(415, -1)
114+
return false
115+
}
116+
return true
117+
}
118+
119+
private fun validateJsonOut(exchange: HttpExchange): Boolean {
120+
val acceptedMedia = exchange.requestHeaders["Accept"]!!
121+
if (acceptedMedia.isNotEmpty() &&
122+
acceptedMedia.none { CANDIDATE_ACCEPT_HEADERS.contains(it) }) {
123+
exchange.sendResponseHeaders(406, -1)
124+
return false
125+
}
126+
return true
127+
}
128+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.neo4j.connectors.kafka.testing
18+
19+
import org.junit.jupiter.api.Assertions.*
20+
import org.junit.jupiter.api.Test
21+
22+
class WordSupportTest {
23+
24+
@Test
25+
fun pluralizes() {
26+
assertEquals("flowers", WordSupport.pluralize(2, "flower", "flowers"))
27+
assertEquals("alumnus", WordSupport.pluralize(1, "alumnus", "alumni"))
28+
assertEquals("query", WordSupport.pluralize(0, "query", "queries"))
29+
assertEquals("query", WordSupport.pluralize(0, "query", "queries"))
30+
assertEquals("dollar", WordSupport.pluralize(-1, "dollar", "dollars"))
31+
assertEquals("years", WordSupport.pluralize(-2, "year", "years"))
32+
}
33+
34+
@Test
35+
fun `converts camel case to upper snake case`() {
36+
assertEquals("EASY", WordSupport.camelCaseToUpperSnakeCase("easy"))
37+
assertEquals("THIS_IS_BANANAS", WordSupport.camelCaseToUpperSnakeCase("thisIsBananas"))
38+
}
39+
}

0 commit comments

Comments
 (0)