Skip to content

Commit c36d12d

Browse files
committed
feat: support session injection for sink tests
1 parent ea298e1 commit c36d12d

File tree

4 files changed

+174
-15
lines changed

4 files changed

+174
-15
lines changed

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSink.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.neo4j.connectors.kafka.testing.source.DEFAULT_TO_ENV
2424
@ExtendWith(Neo4jSinkExtension::class)
2525
annotation class Neo4jSink(
2626
val kafkaConnectExternalUri: String = DEFAULT_TO_ENV,
27+
val neo4jExternalUri: String = DEFAULT_TO_ENV,
2728
val neo4jUri: String = DEFAULT_TO_ENV,
2829
val neo4jUser: String = DEFAULT_TO_ENV,
2930
val neo4jPassword: String = DEFAULT_TO_ENV,

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtension.kt

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,42 @@ import org.junit.jupiter.api.extension.ConditionEvaluationResult
2222
import org.junit.jupiter.api.extension.ExecutionCondition
2323
import org.junit.jupiter.api.extension.ExtensionConfigurationException
2424
import org.junit.jupiter.api.extension.ExtensionContext
25+
import org.junit.jupiter.api.extension.ParameterContext
26+
import org.junit.jupiter.api.extension.ParameterResolver
2527
import org.neo4j.connectors.kafka.testing.AnnotationSupport
2628
import org.neo4j.connectors.kafka.testing.EnvBackedSetting
2729
import org.neo4j.connectors.kafka.testing.WordSupport.pluralize
30+
import org.neo4j.driver.AuthToken
31+
import org.neo4j.driver.AuthTokens
32+
import org.neo4j.driver.Driver
33+
import org.neo4j.driver.GraphDatabase
34+
import org.neo4j.driver.Session
2835

2936
internal class Neo4jSinkExtension(
3037
// visible for testing
31-
envAccessor: (String) -> String? = System::getenv
32-
) : ExecutionCondition, BeforeEachCallback, AfterEachCallback {
38+
envAccessor: (String) -> String? = System::getenv,
39+
private val driverFactory: (String, AuthToken) -> Driver = GraphDatabase::driver
40+
) : ExecutionCondition, BeforeEachCallback, AfterEachCallback, ParameterResolver {
3341

3442
private lateinit var sinkAnnotation: Neo4jSink
43+
3544
private lateinit var sink: Neo4jSinkRegistration
3645

46+
private lateinit var driver: Driver
47+
48+
private lateinit var session: Session
49+
3750
private val kafkaConnectExternalUri =
3851
EnvBackedSetting<Neo4jSink>(
3952
"kafkaConnectExternalUri", { it.kafkaConnectExternalUri }, envAccessor)
53+
4054
private val neo4jUri = EnvBackedSetting<Neo4jSink>("neo4jUri", { it.neo4jUri }, envAccessor)
55+
56+
private val neo4jExternalUri =
57+
EnvBackedSetting<Neo4jSink>("neo4jExternalUri", { it.neo4jExternalUri }, envAccessor)
58+
4159
private val neo4jUser = EnvBackedSetting<Neo4jSink>("neo4jUser", { it.neo4jUser }, envAccessor)
60+
4261
private val neo4jPassword =
4362
EnvBackedSetting<Neo4jSink>("neo4jPassword", { it.neo4jPassword }, envAccessor)
4463

@@ -77,6 +96,10 @@ internal class Neo4jSinkExtension(
7796
}
7897

7998
override fun beforeEach(extensionContext: ExtensionContext?) {
99+
if (::driver.isInitialized) {
100+
driver.verifyConnectivity()
101+
}
102+
80103
sink =
81104
Neo4jSinkRegistration(
82105
neo4jUri = neo4jUri.read(sinkAnnotation),
@@ -87,6 +110,27 @@ internal class Neo4jSinkExtension(
87110
}
88111

89112
override fun afterEach(extensionContent: ExtensionContext?) {
113+
if (::driver.isInitialized) {
114+
session.close()
115+
driver.close()
116+
}
90117
sink.unregister()
91118
}
119+
120+
override fun supportsParameter(
121+
parameterContext: ParameterContext?,
122+
p1: ExtensionContext?
123+
): Boolean {
124+
return parameterContext?.parameter?.type == Session::class.java
125+
}
126+
127+
override fun resolveParameter(parameterContext: ParameterContext?, p1: ExtensionContext?): Any {
128+
val uri = neo4jExternalUri.read(sinkAnnotation)
129+
val username = neo4jUser.read(sinkAnnotation)
130+
val password = neo4jPassword.read(sinkAnnotation)
131+
driver = driverFactory(uri, AuthTokens.basic(username, password))
132+
// TODO: handle multiple parameter injection
133+
session = driver.session()
134+
return session
135+
}
92136
}

testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/source/Neo4jSourceExtension.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,29 @@ internal class Neo4jSourceExtension(
5656
private lateinit var source: Neo4jSourceRegistration
5757

5858
private lateinit var driver: Driver
59+
5960
private lateinit var session: Session
6061

6162
private val brokerExternalHost =
6263
EnvBackedSetting<Neo4jSource>("brokerExternalHost", { it.brokerExternalHost })
64+
6365
private val schemaRegistryUri =
6466
EnvBackedSetting<Neo4jSource>("schemaControlRegistryUri", { it.schemaControlRegistryUri })
67+
6568
private val schemaRegistryExternalUri =
6669
EnvBackedSetting<Neo4jSource>(
6770
"schemaControlRegistryExternalUri", { it.schemaControlRegistryExternalUri })
71+
6872
private val kafkaConnectExternalUri =
6973
EnvBackedSetting<Neo4jSource>("kafkaConnectExternalUri", { it.kafkaConnectExternalUri })
74+
7075
private val neo4jUri = EnvBackedSetting<Neo4jSource>("neo4jUri", { it.neo4jUri })
76+
7177
private val neo4jExternalUri =
7278
EnvBackedSetting<Neo4jSource>("neo4jExternalUri", { it.neo4jExternalUri })
79+
7380
private val neo4jUser = EnvBackedSetting<Neo4jSource>("neo4jUser", { it.neo4jUser })
81+
7482
private val neo4jPassword = EnvBackedSetting<Neo4jSource>("neo4jPassword", { it.neo4jPassword })
7583

7684
private val envSettings =

testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/sink/Neo4jSinkExtensionTest.kt

Lines changed: 119 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,38 @@
1616
*/
1717
package org.neo4j.connectors.kafka.testing.sink
1818

19+
import com.sun.net.httpserver.HttpExchange
20+
import java.lang.reflect.Parameter
1921
import java.util.concurrent.atomic.AtomicBoolean
20-
import kotlin.reflect.KFunction0
22+
import kotlin.reflect.KClass
23+
import kotlin.reflect.KFunction
2124
import kotlin.reflect.jvm.javaMethod
2225
import kotlin.test.assertContains
2326
import kotlin.test.assertEquals
2427
import kotlin.test.assertFailsWith
2528
import kotlin.test.assertFalse
2629
import kotlin.test.assertIs
30+
import kotlin.test.assertSame
2731
import kotlin.test.assertTrue
2832
import org.junit.jupiter.api.AfterEach
2933
import org.junit.jupiter.api.Test
3034
import org.junit.jupiter.api.extension.ConditionEvaluationResult
3135
import org.junit.jupiter.api.extension.ExtensionConfigurationException
3236
import org.junit.jupiter.api.extension.ExtensionContext
37+
import org.junit.jupiter.api.extension.ParameterContext
38+
import org.mockito.kotlin.doAnswer
3339
import org.mockito.kotlin.doReturn
40+
import org.mockito.kotlin.inOrder
3441
import org.mockito.kotlin.mock
42+
import org.mockito.kotlin.verify
3543
import org.neo4j.connectors.kafka.testing.KafkaConnectServer
44+
import org.neo4j.driver.Driver
45+
import org.neo4j.driver.Session
3646

3747
class Neo4jSinkExtensionTest {
3848

3949
private val extension = Neo4jSinkExtension()
50+
4051
private val kafkaConnectServer = KafkaConnectServer()
4152

4253
@AfterEach
@@ -58,14 +69,12 @@ class Neo4jSinkExtensionTest {
5869
kafkaConnectServer.start(
5970
registrationHandler = { exchange ->
6071
if (!handlerCalled.compareAndSet(false, true)) {
61-
exchange.sendResponseHeaders(500, -1)
62-
exchange.responseBody
63-
.bufferedWriter()
64-
.write("expected handler flag to be initially false")
72+
internalServerError(exchange, "expected handler flag to be initially false")
6573
return@start true
6674
}
6775
return@start false
68-
})
76+
},
77+
)
6978
val environment =
7079
mapOf(
7180
"KAFKA_CONNECT_EXTERNAL_URI" to kafkaConnectServer.address(),
@@ -85,14 +94,12 @@ class Neo4jSinkExtensionTest {
8594
kafkaConnectServer.start(
8695
unregistrationHandler = { exchange ->
8796
if (!handlerCalled.compareAndSet(false, true)) {
88-
exchange.sendResponseHeaders(500, -1)
89-
exchange.responseBody
90-
.bufferedWriter()
91-
.write("expected handler flag to be initially false")
97+
internalServerError(exchange, "expected handler flag to be initially false")
9298
return@start true
9399
}
94100
return@start false
95-
})
101+
},
102+
)
96103
val environment =
97104
mapOf(
98105
"KAFKA_CONNECT_EXTERNAL_URI" to kafkaConnectServer.address(),
@@ -107,6 +114,93 @@ class Neo4jSinkExtensionTest {
107114
assertTrue(handlerCalled.get(), "unregistration should be successful")
108115
}
109116

117+
@Test
118+
fun `supports specific parameters`() {
119+
assertTrue(
120+
extension.supportsParameter(
121+
parameterContextForType(Session::class),
122+
mock<ExtensionContext>(),
123+
),
124+
"session parameters should be resolvable",
125+
)
126+
assertFalse(
127+
extension.supportsParameter(
128+
parameterContextForType(Thread::class),
129+
mock<ExtensionContext>(),
130+
),
131+
"arbitrary parameters should not be supported",
132+
)
133+
}
134+
135+
@Test
136+
fun `resolves Session parameters`() {
137+
val session = mock<Session>()
138+
val driver = mock<Driver> { on { session() } doReturn session }
139+
val extension = Neo4jSinkExtension(driverFactory = { _, _ -> driver })
140+
val extensionContext = extensionContextFor(::validMethod)
141+
extension.evaluateExecutionCondition(extensionContext)
142+
143+
val sessionParam =
144+
extension.resolveParameter(parameterContextForType(Session::class), extensionContext)
145+
146+
assertIs<Session>(sessionParam)
147+
assertSame(session, sessionParam)
148+
}
149+
150+
@Test
151+
fun `verifies connectivity if driver is initialized before each test`() {
152+
kafkaConnectServer.start()
153+
val session = mock<Session>()
154+
val environment =
155+
mapOf(
156+
"KAFKA_CONNECT_EXTERNAL_URI" to kafkaConnectServer.address(),
157+
)
158+
val driver =
159+
mock<Driver> {
160+
on { session() } doReturn session
161+
on { verifyConnectivity() } doAnswer {}
162+
}
163+
val extension =
164+
Neo4jSinkExtension(
165+
envAccessor = environment::get,
166+
driverFactory = { _, _ -> driver },
167+
)
168+
val extensionContext = extensionContextFor(::onlyKafkaExternalUriFromEnvMethod)
169+
extension.evaluateExecutionCondition(extensionContext)
170+
extension.resolveParameter(parameterContextForType(Session::class), extensionContext)
171+
172+
extension.beforeEach(extensionContext)
173+
174+
verify(driver).verifyConnectivity()
175+
}
176+
177+
@Test
178+
fun `closes Driver and Session after each test`() {
179+
kafkaConnectServer.start()
180+
val session = mock<Session>()
181+
val environment =
182+
mapOf(
183+
"KAFKA_CONNECT_EXTERNAL_URI" to kafkaConnectServer.address(),
184+
)
185+
val driver = mock<Driver> { on { session() } doReturn session }
186+
val extension =
187+
Neo4jSinkExtension(
188+
envAccessor = environment::get,
189+
driverFactory = { _, _ -> driver },
190+
)
191+
val extensionContext = extensionContextFor(::onlyKafkaExternalUriFromEnvMethod)
192+
extension.evaluateExecutionCondition(extensionContext)
193+
extension.resolveParameter(parameterContextForType(Session::class), extensionContext)
194+
extension.beforeEach(extensionContext)
195+
196+
extension.afterEach(extensionContext)
197+
198+
inOrder(session, driver) {
199+
verify(session).close()
200+
verify(driver).close()
201+
}
202+
}
203+
110204
@Test
111205
fun `stops execution evaluation if annotation is not found`() {
112206
val exception =
@@ -220,13 +314,22 @@ class Neo4jSinkExtensionTest {
220314
)
221315
}
222316

223-
private fun extensionContextFor(method: KFunction0<Unit>) =
317+
private fun parameterContextForType(parameterType: KClass<*>): ParameterContext {
318+
val param = mock<Parameter> { on { type } doReturn parameterType.java }
319+
return mock<ParameterContext> { on { parameter } doReturn param }
320+
}
321+
322+
private fun extensionContextFor(method: KFunction<Unit>) =
224323
mock<ExtensionContext> { on { requiredTestMethod } doReturn method.javaMethod }
225324

226-
@Suppress("UNUSED") fun missingAnnotationMethod() {}
325+
private fun internalServerError(exchange: HttpExchange, error: String) {
326+
exchange.sendResponseHeaders(500, -1)
327+
exchange.responseBody.bufferedWriter().write(error)
328+
}
227329

228330
@Neo4jSink(
229331
kafkaConnectExternalUri = "http://example.com",
332+
neo4jExternalUri = "neo4j://example.com",
230333
neo4jUri = "neo4j://example.com",
231334
neo4jUser = "user",
232335
neo4jPassword = "password",
@@ -237,6 +340,7 @@ class Neo4jSinkExtensionTest {
237340
fun validMethod() {}
238341

239342
@Neo4jSink(
343+
neo4jExternalUri = "neo4j://example.com",
240344
neo4jUri = "neo4j://example.com",
241345
neo4jUser = "user",
242346
neo4jPassword = "password",
@@ -250,6 +354,8 @@ class Neo4jSinkExtensionTest {
250354
@Suppress("UNUSED")
251355
fun envBackedMethod() {}
252356

357+
@Suppress("UNUSED") fun missingAnnotationMethod() {}
358+
253359
@Neo4jSink(
254360
kafkaConnectExternalUri = "http://example.com",
255361
neo4jUri = "neo4j://example.com",

0 commit comments

Comments
 (0)