diff --git a/pom.xml b/pom.xml
index 19fbdf557..f0e3d1a13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,6 +33,7 @@
source
sink
packaging
+ testing
24.0.1
@@ -57,6 +58,7 @@
3.3.0
3.1.2
3.1.2
+ 5.1.0
5.5.0
4.4.9
UTF-8
@@ -162,6 +164,11 @@
annotations
${annotations.version}
+
+ org.mockito.kotlin
+ mockito-kotlin
+ ${mockito-kotlin.version}
+
org.neo4j.driver
neo4j-java-driver
diff --git a/source/pom.xml b/source/pom.xml
index 4a73f04b7..91482fc5c 100644
--- a/source/pom.xml
+++ b/source/pom.xml
@@ -12,10 +12,6 @@
source
Neo4j Connector for Kafka - Source
-
- com.fasterxml.jackson.core
- jackson-databind
-
org.neo4j.connectors.kafka
common
@@ -31,11 +27,6 @@
${kafka.version}
provided
-
- io.confluent
- kafka-avro-serializer
- test
-
org.assertj
assertj-core
@@ -48,7 +39,7 @@
org.jetbrains.kotlin
- kotlin-test
+ kotlin-test-junit5
test
@@ -61,6 +52,12 @@
mockito-core
test
+
+ org.neo4j.connectors.kafka
+ testing
+ ${project.version}
+ test
+
org.slf4j
slf4j-nop
diff --git a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceIT.kt b/source/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceIT.kt
index f2abe288d..90d3fbef5 100644
--- a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceIT.kt
+++ b/source/src/test/kotlin/org/neo4j/connectors/kafka/source/Neo4jSourceIT.kt
@@ -21,8 +21,9 @@ import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInfo
-import org.neo4j.connectors.kafka.source.testing.Neo4jSource
-import org.neo4j.connectors.kafka.source.testing.TopicVerifier
+import org.neo4j.connectors.kafka.testing.Neo4jSource
+import org.neo4j.connectors.kafka.testing.TopicConsumer
+import org.neo4j.connectors.kafka.testing.TopicVerifier
import org.neo4j.driver.Session
class Neo4jSourceIT {
@@ -34,14 +35,13 @@ class Neo4jSourceIT {
@Neo4jSource(
topic = TOPIC,
streamingProperty = "timestamp",
- streamingFrom = StreamingFrom.ALL,
+ streamingFrom = "ALL",
streamingQuery =
- "MATCH (ts:TestSource) WHERE ts.timestamp > \$lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp, ts.execId AS execId",
- consumerOffset = "earliest",
- )
+ "MATCH (ts:TestSource) WHERE ts.timestamp > \$lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp, ts.execId AS execId")
@Test
fun `reads latest changes from Neo4j source`(
testInfo: TestInfo,
+ @TopicConsumer(topic = TOPIC, offset = "earliest")
consumer: KafkaConsumer,
session: Session
) {
diff --git a/testing/LICENSES.txt b/testing/LICENSES.txt
new file mode 100644
index 000000000..155b37a8d
--- /dev/null
+++ b/testing/LICENSES.txt
@@ -0,0 +1,635 @@
+This file contains the full license text of the included third party
+libraries. For an overview of the licenses see the NOTICE.txt file.
+
+
+------------------------------------------------------------------------------
+Apache Software License, Version 2.0
+ Apache Avro
+ Apache Commons Compress
+ Apache Kafka
+ Apache Yetus - Audience Annotations
+ Awaitility
+ config
+ Jackson-annotations
+ Jackson-core
+ jackson-databind
+ JetBrains Java Annotations
+ kafka-avro-serializer
+ kafka-schema-registry-client
+ Kotlin Stdlib
+ Kotlin Stdlib Common
+ LZ4 and xxHash
+ Neo4j Java Driver
+ Netty
+ org.apiguardian:apiguardian-api
+ org.opentest4j:opentest4j
+ snappy-java
+ utils
+ zookeeper
+------------------------------------------------------------------------------
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+
+------------------------------------------------------------------------------
+BSD License
+ Hamcrest
+ JLine
+------------------------------------------------------------------------------
+
+Copyright (c) ,
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * Neither the name of the nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+
+------------------------------------------------------------------------------
+BSD License 2-clause
+ zstd-jni
+------------------------------------------------------------------------------
+
+Copyright
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+
+------------------------------------------------------------------------------
+Eclipse Public License v2.0
+ JUnit Jupiter API
+ JUnit Platform Commons
+------------------------------------------------------------------------------
+
+Eclipse Public License - v 2.0
+
+ THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+ PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION
+ OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+1. DEFINITIONS
+
+"Contribution" means:
+
+ a) in the case of the initial Contributor, the initial content
+ Distributed under this Agreement, and
+
+ b) in the case of each subsequent Contributor:
+ i) changes to the Program, and
+ ii) additions to the Program;
+ where such changes and/or additions to the Program originate from
+ and are Distributed by that particular Contributor. A Contribution
+ "originates" from a Contributor if it was added to the Program by
+ such Contributor itself or anyone acting on such Contributor's behalf.
+ Contributions do not include changes or additions to the Program that
+ are not Modified Works.
+
+"Contributor" means any person or entity that Distributes the Program.
+
+"Licensed Patents" mean patent claims licensable by a Contributor which
+are necessarily infringed by the use or sale of its Contribution alone
+or when combined with the Program.
+
+"Program" means the Contributions Distributed in accordance with this
+Agreement.
+
+"Recipient" means anyone who receives the Program under this Agreement
+or any Secondary License (as applicable), including Contributors.
+
+"Derivative Works" shall mean any work, whether in Source Code or other
+form, that is based on (or derived from) the Program and for which the
+editorial revisions, annotations, elaborations, or other modifications
+represent, as a whole, an original work of authorship.
+
+"Modified Works" shall mean any work in Source Code or other form that
+results from an addition to, deletion from, or modification of the
+contents of the Program, including, for purposes of clarity any new file
+in Source Code form that contains any contents of the Program. Modified
+Works shall not include works that contain only declarations,
+interfaces, types, classes, structures, or files of the Program solely
+in each case in order to link to, bind by name, or subclass the Program
+or Modified Works thereof.
+
+"Distribute" means the acts of a) distributing or b) making available
+in any manner that enables the transfer of a copy.
+
+"Source Code" means the form of a Program preferred for making
+modifications, including but not limited to software source code,
+documentation source, and configuration files.
+
+"Secondary License" means either the GNU General Public License,
+Version 2.0, or any later versions of that license, including any
+exceptions or additional permissions as identified by the initial
+Contributor.
+
+2. GRANT OF RIGHTS
+
+ a) Subject to the terms of this Agreement, each Contributor hereby
+ grants Recipient a non-exclusive, worldwide, royalty-free copyright
+ license to reproduce, prepare Derivative Works of, publicly display,
+ publicly perform, Distribute and sublicense the Contribution of such
+ Contributor, if any, and such Derivative Works.
+
+ b) Subject to the terms of this Agreement, each Contributor hereby
+ grants Recipient a non-exclusive, worldwide, royalty-free patent
+ license under Licensed Patents to make, use, sell, offer to sell,
+ import and otherwise transfer the Contribution of such Contributor,
+ if any, in Source Code or other form. This patent license shall
+ apply to the combination of the Contribution and the Program if, at
+ the time the Contribution is added by the Contributor, such addition
+ of the Contribution causes such combination to be covered by the
+ Licensed Patents. The patent license shall not apply to any other
+ combinations which include the Contribution. No hardware per se is
+ licensed hereunder.
+
+ c) Recipient understands that although each Contributor grants the
+ licenses to its Contributions set forth herein, no assurances are
+ provided by any Contributor that the Program does not infringe the
+ patent or other intellectual property rights of any other entity.
+ Each Contributor disclaims any liability to Recipient for claims
+ brought by any other entity based on infringement of intellectual
+ property rights or otherwise. As a condition to exercising the
+ rights and licenses granted hereunder, each Recipient hereby
+ assumes sole responsibility to secure any other intellectual
+ property rights needed, if any. For example, if a third party
+ patent license is required to allow Recipient to Distribute the
+ Program, it is Recipient's responsibility to acquire that license
+ before distributing the Program.
+
+ d) Each Contributor represents that to its knowledge it has
+ sufficient copyright rights in its Contribution, if any, to grant
+ the copyright license set forth in this Agreement.
+
+ e) Notwithstanding the terms of any Secondary License, no
+ Contributor makes additional grants to any Recipient (other than
+ those set forth in this Agreement) as a result of such Recipient's
+ receipt of the Program under the terms of a Secondary License
+ (if permitted under the terms of Section 3).
+
+3. REQUIREMENTS
+
+3.1 If a Contributor Distributes the Program in any form, then:
+
+ a) the Program must also be made available as Source Code, in
+ accordance with section 3.2, and the Contributor must accompany
+ the Program with a statement that the Source Code for the Program
+ is available under this Agreement, and informs Recipients how to
+ obtain it in a reasonable manner on or through a medium customarily
+ used for software exchange; and
+
+ b) the Contributor may Distribute the Program under a license
+ different than this Agreement, provided that such license:
+ i) effectively disclaims on behalf of all other Contributors all
+ warranties and conditions, express and implied, including
+ warranties or conditions of title and non-infringement, and
+ implied warranties or conditions of merchantability and fitness
+ for a particular purpose;
+
+ ii) effectively excludes on behalf of all other Contributors all
+ liability for damages, including direct, indirect, special,
+ incidental and consequential damages, such as lost profits;
+
+ iii) does not attempt to limit or alter the recipients' rights
+ in the Source Code under section 3.2; and
+
+ iv) requires any subsequent distribution of the Program by any
+ party to be under a license that satisfies the requirements
+ of this section 3.
+
+3.2 When the Program is Distributed as Source Code:
+
+ a) it must be made available under this Agreement, or if the
+ Program (i) is combined with other material in a separate file or
+ files made available under a Secondary License, and (ii) the initial
+ Contributor attached to the Source Code the notice described in
+ Exhibit A of this Agreement, then the Program may be made available
+ under the terms of such Secondary Licenses, and
+
+ b) a copy of this Agreement must be included with each copy of
+ the Program.
+
+3.3 Contributors may not remove or alter any copyright, patent,
+trademark, attribution notices, disclaimers of warranty, or limitations
+of liability ("notices") contained within the Program from any copy of
+the Program which they Distribute, provided that Contributors may add
+their own appropriate notices.
+
+4. COMMERCIAL DISTRIBUTION
+
+Commercial distributors of software may accept certain responsibilities
+with respect to end users, business partners and the like. While this
+license is intended to facilitate the commercial use of the Program,
+the Contributor who includes the Program in a commercial product
+offering should do so in a manner which does not create potential
+liability for other Contributors. Therefore, if a Contributor includes
+the Program in a commercial product offering, such Contributor
+("Commercial Contributor") hereby agrees to defend and indemnify every
+other Contributor ("Indemnified Contributor") against any losses,
+damages and costs (collectively "Losses") arising from claims, lawsuits
+and other legal actions brought by a third party against the Indemnified
+Contributor to the extent caused by the acts or omissions of such
+Commercial Contributor in connection with its distribution of the Program
+in a commercial product offering. The obligations in this section do not
+apply to any claims or Losses relating to any actual or alleged
+intellectual property infringement. In order to qualify, an Indemnified
+Contributor must: a) promptly notify the Commercial Contributor in
+writing of such claim, and b) allow the Commercial Contributor to control,
+and cooperate with the Commercial Contributor in, the defense and any
+related settlement negotiations. The Indemnified Contributor may
+participate in any such claim at its own expense.
+
+For example, a Contributor might include the Program in a commercial
+product offering, Product X. That Contributor is then a Commercial
+Contributor. If that Commercial Contributor then makes performance
+claims, or offers warranties related to Product X, those performance
+claims and warranties are such Commercial Contributor's responsibility
+alone. Under this section, the Commercial Contributor would have to
+defend claims against the other Contributors related to those performance
+claims and warranties, and if a court requires any other Contributor to
+pay any damages as a result, the Commercial Contributor must pay
+those damages.
+
+5. NO WARRANTY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS"
+BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR
+IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF
+TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
+PURPOSE. Each Recipient is solely responsible for determining the
+appropriateness of using and distributing the Program and assumes all
+risks associated with its exercise of rights under this Agreement,
+including but not limited to the risks and costs of program errors,
+compliance with applicable laws, damage to or loss of data, programs
+or equipment, and unavailability or interruption of operations.
+
+6. DISCLAIMER OF LIABILITY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS
+SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
+PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
+EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGES.
+
+7. GENERAL
+
+If any provision of this Agreement is invalid or unenforceable under
+applicable law, it shall not affect the validity or enforceability of
+the remainder of the terms of this Agreement, and without further
+action by the parties hereto, such provision shall be reformed to the
+minimum extent necessary to make such provision valid and enforceable.
+
+If Recipient institutes patent litigation against any entity
+(including a cross-claim or counterclaim in a lawsuit) alleging that the
+Program itself (excluding combinations of the Program with other software
+or hardware) infringes such Recipient's patent(s), then such Recipient's
+rights granted under Section 2(b) shall terminate as of the date such
+litigation is filed.
+
+All Recipient's rights under this Agreement shall terminate if it
+fails to comply with any of the material terms or conditions of this
+Agreement and does not cure such failure in a reasonable period of
+time after becoming aware of such noncompliance. If all Recipient's
+rights under this Agreement terminate, Recipient agrees to cease use
+and distribution of the Program as soon as reasonably practicable.
+However, Recipient's obligations under this Agreement and any licenses
+granted by Recipient relating to the Program shall continue and survive.
+
+Everyone is permitted to copy and distribute copies of this Agreement,
+but in order to avoid inconsistency the Agreement is copyrighted and
+may only be modified in the following manner. The Agreement Steward
+reserves the right to publish new versions (including revisions) of
+this Agreement from time to time. No one other than the Agreement
+Steward has the right to modify this Agreement. The Eclipse Foundation
+is the initial Agreement Steward. The Eclipse Foundation may assign the
+responsibility to serve as the Agreement Steward to a suitable separate
+entity. Each new version of the Agreement will be given a distinguishing
+version number. The Program (including Contributions) may always be
+Distributed subject to the version of the Agreement under which it was
+received. In addition, after a new version of the Agreement is published,
+Contributor may elect to Distribute the Program (including its
+Contributions) under the new version.
+
+Except as expressly stated in Sections 2(a) and 2(b) above, Recipient
+receives no rights or licenses to the intellectual property of any
+Contributor under this Agreement, whether expressly, by implication,
+estoppel or otherwise. All rights in the Program not expressly granted
+under this Agreement are reserved. Nothing in this Agreement is intended
+to be enforceable by any entity that is not a Contributor or Recipient.
+No third-party beneficiary rights are created under this Agreement.
+
+Exhibit A - Form of Secondary Licenses Notice
+
+"This Source Code may also be made available under the following
+Secondary Licenses when the conditions for such availability set forth
+in the Eclipse Public License, v. 2.0 are satisfied: {name license(s),
+version(s), and exceptions or additional permissions here}."
+
+ Simply including a copy of this Agreement, including this Exhibit A
+ is not sufficient to license the Source Code under Secondary Licenses.
+
+ If it is not possible or desirable to put the notice in a particular
+ file, then You may include the notice in a location (such as a LICENSE
+ file in a relevant directory) where a recipient would be likely to
+ look for such a notice.
+
+ You may add additional accurate notices of copyright ownership.
+
+
+------------------------------------------------------------------------------
+MIT License
+ SLF4J API Module
+------------------------------------------------------------------------------
+
+The MIT License
+
+Copyright (c)
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+
+
+
+------------------------------------------------------------------------------
+MIT No Attribution License
+ reactive-streams
+------------------------------------------------------------------------------
+
+The MIT No Attribution License
+
+Copyright
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this
+software and associated documentation files (the "Software"), to deal in the Software
+without restriction, including without limitation the rights to use, copy, modify,
+merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
+INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+
diff --git a/testing/NOTICE.txt b/testing/NOTICE.txt
new file mode 100644
index 000000000..501765aaf
--- /dev/null
+++ b/testing/NOTICE.txt
@@ -0,0 +1,61 @@
+Copyright (c) "Neo4j"
+Neo4j Sweden AB [http://neo4j.com]
+
+This file is part of Neo4j.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Full license texts are found in LICENSES.txt.
+
+
+Third-party licenses
+--------------------
+
+Apache Software License, Version 2.0
+ Apache Avro
+ Apache Commons Compress
+ Apache Kafka
+ Apache Yetus - Audience Annotations
+ Awaitility
+ config
+ Jackson-annotations
+ Jackson-core
+ jackson-databind
+ JetBrains Java Annotations
+ kafka-avro-serializer
+ kafka-schema-registry-client
+ Kotlin Stdlib
+ Kotlin Stdlib Common
+ LZ4 and xxHash
+ Neo4j Java Driver
+ Netty
+ org.apiguardian:apiguardian-api
+ org.opentest4j:opentest4j
+ snappy-java
+ utils
+ zookeeper
+
+BSD License
+ Hamcrest
+ JLine
+
+BSD License 2-clause
+ zstd-jni
+
+Eclipse Public License v2.0
+ JUnit Jupiter API
+ JUnit Platform Commons
+
+MIT License
+ SLF4J API Module
+
+MIT No Attribution License
+ reactive-streams
+
diff --git a/testing/pom.xml b/testing/pom.xml
new file mode 100644
index 000000000..ddf526677
--- /dev/null
+++ b/testing/pom.xml
@@ -0,0 +1,78 @@
+
+
+ 4.0.0
+
+ org.neo4j.connectors.kafka
+ parent
+ 5.1.0-SNAPSHOT
+
+ testing
+ jar
+ testing
+ Neo4j Connector for Kafka - Testing Library
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ io.confluent
+ kafka-avro-serializer
+
+
+
+ com.101tec
+ zkclient
+
+
+
+
+ org.apache.avro
+ avro
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+ org.awaitility
+ awaitility
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+
+
+ org.neo4j.driver
+ neo4j-java-driver
+
+
+ org.jetbrains.kotlin
+ kotlin-test-junit5
+ test
+
+
+ org.mockito.kotlin
+ mockito-kotlin
+ test
+
+
+ org.slf4j
+ slf4j-nop
+ test
+
+
+
+
+
+ org.jetbrains.kotlin
+ kotlin-maven-plugin
+
+
+
+
diff --git a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSource.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSource.kt
similarity index 86%
rename from source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSource.kt
rename to testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSource.kt
index a507b2b43..70e5250b8 100644
--- a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSource.kt
+++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSource.kt
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.connectors.kafka.source.testing
+package org.neo4j.connectors.kafka.testing
import org.junit.jupiter.api.extension.ExtendWith
-import org.neo4j.connectors.kafka.source.StreamingFrom
const val DEFAULT_TO_ENV = "___UNSET___"
@@ -35,7 +34,6 @@ annotation class Neo4jSource(
val neo4jPassword: String = DEFAULT_TO_ENV,
val topic: String,
val streamingProperty: String,
- val streamingFrom: StreamingFrom,
- val streamingQuery: String,
- val consumerOffset: String = "latest"
+ val streamingFrom: String,
+ val streamingQuery: String
)
diff --git a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSourceExtension.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceExtension.kt
similarity index 80%
rename from source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSourceExtension.kt
rename to testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceExtension.kt
index 559c6d9e6..de62e68b6 100644
--- a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSourceExtension.kt
+++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceExtension.kt
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.connectors.kafka.source.testing
+package org.neo4j.connectors.kafka.testing
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
@@ -39,18 +39,20 @@ import org.neo4j.driver.Driver
import org.neo4j.driver.GraphDatabase
import org.neo4j.driver.Session
-class Neo4jSourceExtension :
- ExecutionCondition, BeforeEachCallback, AfterEachCallback, ParameterResolver {
+internal class Neo4jSourceExtension(
+ private val consumerSupplier: ConsumerSupplier = DefaultConsumerSupplier
+) : ExecutionCondition, BeforeEachCallback, AfterEachCallback, ParameterResolver {
companion object {
val PARAMETER_RESOLVERS:
Map, (Neo4jSourceExtension, ParameterContext?, ExtensionContext?) -> Any> =
mapOf(
KafkaConsumer::class.java to Neo4jSourceExtension::resolveConsumer,
- Session::class.java to Neo4jSourceExtension::resolveSession)
+ Session::class.java to Neo4jSourceExtension::resolveSession,
+ )
}
- private lateinit var metadata: Neo4jSource
+ /*visible for testing */ internal lateinit var sourceAnnotation: Neo4jSource
private lateinit var source: Neo4jSourceRegistration
private lateinit var driver: Driver
@@ -111,7 +113,7 @@ class Neo4jSourceExtension :
)
}
- this.metadata = metadata
+ this.sourceAnnotation = metadata
return ConditionEvaluationResult.enabled("@Neo4jSource and environment properly configured")
}
@@ -121,16 +123,16 @@ class Neo4jSourceExtension :
}
source =
Neo4jSourceRegistration(
- schemaControlRegistryUri = schemaRegistryUri.read(metadata),
- neo4jUri = neo4jUri.read(metadata),
- neo4jUser = neo4jUser.read(metadata),
- neo4jPassword = neo4jPassword.read(metadata),
- topic = metadata.topic,
- streamingProperty = metadata.streamingProperty,
- streamingFrom = metadata.streamingFrom,
- streamingQuery = metadata.streamingQuery,
+ schemaControlRegistryUri = schemaRegistryUri.read(sourceAnnotation),
+ neo4jUri = neo4jUri.read(sourceAnnotation),
+ neo4jUser = neo4jUser.read(sourceAnnotation),
+ neo4jPassword = neo4jPassword.read(sourceAnnotation),
+ topic = sourceAnnotation.topic,
+ streamingProperty = sourceAnnotation.streamingProperty,
+ streamingFrom = sourceAnnotation.streamingFrom,
+ streamingQuery = sourceAnnotation.streamingQuery,
)
- source.register(kafkaConnectExternalUri.read(metadata))
+ source.register(kafkaConnectExternalUri.read(sourceAnnotation))
}
override fun afterEach(context: ExtensionContext?) {
@@ -156,7 +158,8 @@ class Neo4jSourceExtension :
val resolver =
PARAMETER_RESOLVERS[parameterType]
?: throw ParameterResolutionException(
- "@Neo4jSource does not support injection of parameters typed $parameterType")
+ "@Neo4jSource does not support injection of parameters typed $parameterType",
+ )
return resolver(this, parameterContext, extensionContext)
}
@@ -167,11 +170,11 @@ class Neo4jSourceExtension :
val properties = Properties()
properties.setProperty(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
- brokerExternalHost.read(metadata),
+ brokerExternalHost.read(sourceAnnotation),
)
properties.setProperty(
KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
- schemaRegistryExternalUri.read(metadata),
+ schemaRegistryExternalUri.read(sourceAnnotation),
)
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
@@ -182,19 +185,18 @@ class Neo4jSourceExtension :
KafkaAvroDeserializer::class.java.getName(),
)
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, extensionContext?.displayName)
- properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, metadata.consumerOffset)
- val consumer = KafkaConsumer(properties)
- consumer.subscribe(listOf(metadata.topic))
- return consumer
+ val consumerAnnotation = parameterContext?.parameter?.getAnnotation(TopicConsumer::class.java)!!
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAnnotation.offset)
+ return consumerSupplier.getSubscribed(properties, consumerAnnotation.topic)
}
private fun resolveSession(
parameterContext: ParameterContext?,
extensionContext: ExtensionContext?
): Any {
- val uri = neo4jExternalUri.read(metadata)
- val username = neo4jUser.read(metadata)
- val password = neo4jPassword.read(metadata)
+ val uri = neo4jExternalUri.read(sourceAnnotation)
+ val username = neo4jUser.read(sourceAnnotation)
+ val password = neo4jPassword.read(sourceAnnotation)
driver = GraphDatabase.driver(uri, AuthTokens.basic(username, password))
// TODO: handle multiple parameter injection
session = driver.session()
@@ -218,7 +220,22 @@ class Neo4jSourceExtension :
}
}
-class EnvBackedSetting(
+internal interface ConsumerSupplier {
+ fun getSubscribed(properties: Properties, topic: String): KafkaConsumer
+}
+
+internal object DefaultConsumerSupplier : ConsumerSupplier {
+ override fun getSubscribed(
+ properties: Properties,
+ topic: String
+ ): KafkaConsumer {
+ val consumer = KafkaConsumer(properties)
+ consumer.subscribe(listOf(topic))
+ return consumer
+ }
+}
+
+internal class EnvBackedSetting(
private val name: String,
private val envVarName: String,
private val getter: (Neo4jSource) -> String
diff --git a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSourceRegistration.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceRegistration.kt
similarity index 91%
rename from source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSourceRegistration.kt
rename to testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceRegistration.kt
index 83d14f11b..13cd500ea 100644
--- a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/Neo4jSourceRegistration.kt
+++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceRegistration.kt
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.connectors.kafka.source.testing
+package org.neo4j.connectors.kafka.testing
import com.fasterxml.jackson.databind.ObjectMapper
import java.net.URI
@@ -25,10 +25,8 @@ import java.net.http.HttpResponse.BodyHandlers
import java.time.Duration
import java.util.concurrent.ThreadLocalRandom
import kotlin.streams.asSequence
-import org.neo4j.connectors.kafka.source.StreamingFrom
-import streams.kafka.connect.source.Neo4jSourceConnector
-class Neo4jSourceRegistration(
+internal class Neo4jSourceRegistration(
topic: String,
neo4jUri: String,
neo4jUser: String = "neo4j",
@@ -36,7 +34,7 @@ class Neo4jSourceRegistration(
pollInterval: Duration = Duration.ofMillis(5000),
enforceSchema: Boolean = true,
streamingProperty: String,
- streamingFrom: StreamingFrom,
+ streamingFrom: String,
streamingQuery: String,
schemaControlRegistryUri: String
) {
@@ -51,7 +49,7 @@ class Neo4jSourceRegistration(
"config" to
mapOf(
"topic" to topic,
- "connector.class" to Neo4jSourceConnector::class.java.name,
+ "connector.class" to "streams.kafka.connect.source.Neo4jSourceConnector",
"key.converter" to "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" to "http://schema-registry:8081",
"value.converter" to "io.confluent.connect.avro.AvroConverter",
@@ -61,7 +59,7 @@ class Neo4jSourceRegistration(
"neo4j.authentication.basic.password" to neo4jPassword,
"neo4j.streaming.poll.interval.msecs" to pollInterval.toMillis(),
"neo4j.streaming.property" to streamingProperty,
- "neo4j.streaming.from" to streamingFrom.name,
+ "neo4j.streaming.from" to streamingFrom,
"neo4j.enforce.schema" to enforceSchema,
"neo4j.source.query" to streamingQuery,
),
diff --git a/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/TopicConsumer.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/TopicConsumer.kt
new file mode 100644
index 000000000..033623ddf
--- /dev/null
+++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/TopicConsumer.kt
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.connectors.kafka.testing
+
+@Target(AnnotationTarget.VALUE_PARAMETER)
+@Retention(AnnotationRetention.RUNTIME)
+annotation class TopicConsumer(val topic: String, val offset: String)
diff --git a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/TopicVerifier.kt b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/TopicVerifier.kt
similarity index 98%
rename from source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/TopicVerifier.kt
rename to testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/TopicVerifier.kt
index 24e2ecede..75812398f 100644
--- a/source/src/test/kotlin/org/neo4j/connectors/kafka/source/testing/TopicVerifier.kt
+++ b/testing/src/main/kotlin/org/neo4j/connectors/kafka/testing/TopicVerifier.kt
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.connectors.kafka.source.testing
+package org.neo4j.connectors.kafka.testing
import java.time.Duration
import java.util.function.Predicate
diff --git a/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceExtensionTest.kt b/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceExtensionTest.kt
new file mode 100644
index 000000000..a90b064aa
--- /dev/null
+++ b/testing/src/test/kotlin/org/neo4j/connectors/kafka/testing/Neo4jSourceExtensionTest.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) "Neo4j"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.connectors.kafka.testing
+
+import java.lang.reflect.Parameter
+import kotlin.test.assertIs
+import org.apache.avro.generic.GenericRecord
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtensionContext
+import org.junit.jupiter.api.extension.ParameterContext
+import org.mockito.kotlin.anyOrNull
+import org.mockito.kotlin.doReturn
+import org.mockito.kotlin.mock
+import org.mockito.kotlin.verify
+import org.neo4j.driver.Session
+
+class Neo4jSourceExtensionTest {
+
+ @Test
+ fun `resolves Neo4j session`() {
+ val extension = Neo4jSourceExtension()
+ extension.sourceAnnotation =
+ Neo4jSource(
+ topic = "unused",
+ streamingQuery = "unused",
+ streamingFrom = "unused",
+ streamingProperty = "unused",
+ neo4jExternalUri = "neo4j://example.com",
+ neo4jUri = "neo4j://example.com",
+ neo4jUser = "neo4j",
+ neo4jPassword = "s3cr3t!!",
+ )
+ val parameter = mock { on { getType() } doReturn Session::class.java }
+ val parameterContext = mock { on { getParameter() } doReturn parameter }
+ val extensionContext = mock()
+
+ val session = extension.resolveParameter(parameterContext, extensionContext)
+
+ assertIs(session)
+ }
+
+ @Test
+ fun `resolves Kafka consumer`() {
+ val consumer = mock>()
+ val consumerSupplier =
+ mock> {
+ on { getSubscribed(anyOrNull(), anyOrNull()) } doReturn consumer
+ }
+ val extension = Neo4jSourceExtension(consumerSupplier)
+ extension.sourceAnnotation =
+ Neo4jSource(
+ topic = "unused",
+ streamingQuery = "unused",
+ streamingFrom = "unused",
+ streamingProperty = "unused",
+ neo4jUri = "unused",
+ neo4jUser = "unused",
+ neo4jPassword = "unused",
+ brokerExternalHost = "example.com",
+ schemaControlRegistryExternalUri = "example.com")
+
+ val annotation = TopicConsumer(topic = "topic", offset = "earliest")
+ val parameter =
+ mock {
+ on { getType() } doReturn KafkaConsumer::class.java
+ on { getAnnotation(TopicConsumer::class.java) } doReturn annotation
+ }
+ val parameterContext = mock { on { getParameter() } doReturn parameter }
+ val extensionContext =
+ mock { on { displayName } doReturn "some-running-test" }
+
+ val actualConsumer = extension.resolveParameter(parameterContext, extensionContext)
+
+ assertIs>(actualConsumer)
+ verify(consumerSupplier).getSubscribed(anyOrNull(), anyOrNull())
+ }
+}