Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
neo4j.eos-offset-label=Type: String;\nDescription: The label name attached to the nodes used to store offsets to achieve exactly-once semantics. \
Currently only supported with CDC sink strategies with target databases having `apoc.cypher.doIt` procedure available. \
This label needs to have a single node key constraint on the 'strategy', 'topic' and 'partition' properties.
neo4j.cdc.max-batched-queries=Type: Integer;\nDescription: Maximum number of combined queries to be executed when processing CDC events.
neo4j.max-batched-queries=Type: Integer;\nDescription: Maximum number of combined queries to be executed when processing events.
neo4j.cdc.source-id.topics=Type: List<String>;\nDescription: The topic(s) that contain CDC events generated by a Source instance of this connector using `CDC` source strategy.
neo4j.cdc.source-id.label-name=Type: String;\nDescription: The label name attached to the nodes managed by the `CDC Source Id` strategy.
neo4j.cdc.source-id.property-name=Type: String;\nDescription: The id property name attached to the nodes managed by the `CDC Source Id` strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class SinkConfiguration : Neo4jConfiguration {
const val CYPHER_BIND_KEY_AS = "neo4j.cypher.bind-key-as"
const val CYPHER_BIND_VALUE_AS = "neo4j.cypher.bind-value-as"
const val CYPHER_BIND_VALUE_AS_EVENT = "neo4j.cypher.bind-value-as-event"
const val CDC_MAX_BATCHED_QUERIES = "neo4j.cdc.max-batched-queries"
const val MAX_BATCHED_QUERIES = "neo4j.max-batched-queries"
const val CDC_SOURCE_ID_TOPICS = "neo4j.cdc.source-id.topics"
const val CDC_SOURCE_ID_LABEL_NAME = "neo4j.cdc.source-id.label-name"
const val CDC_SOURCE_ID_PROPERTY_NAME = "neo4j.cdc.source-id.property-name"
Expand All @@ -172,7 +172,7 @@ class SinkConfiguration : Neo4jConfiguration {
const val DEFAULT_BIND_KEY_ALIAS = "__key"
const val DEFAULT_BIND_VALUE_ALIAS = "__value"
const val DEFAULT_CYPHER_BIND_VALUE_AS_EVENT = true
const val DEFAULT_CDC_MAX_BATCHED_QUERIES = 50
const val DEFAULT_MAX_BATCHED_QUERIES = 50
const val DEFAULT_SOURCE_ID_LABEL_NAME = "SourceEvent"
const val DEFAULT_SOURCE_ID_PROPERTY_NAME = "sourceId"

Expand Down Expand Up @@ -265,15 +265,11 @@ class SinkConfiguration : Neo4jConfiguration {
}
)
.define(
ConfigKeyBuilder.of(CDC_MAX_BATCHED_QUERIES, ConfigDef.Type.INT) {
ConfigKeyBuilder.of(MAX_BATCHED_QUERIES, ConfigDef.Type.INT) {
importance = ConfigDef.Importance.LOW
defaultValue = DEFAULT_CDC_MAX_BATCHED_QUERIES
defaultValue = DEFAULT_MAX_BATCHED_QUERIES
group = Groups.CONNECTOR_ADVANCED.title
recommender =
Recommenders.visibleIfNotEmpty(
Predicate.isEqual<String>(CDC_SOURCE_ID_TOPICS)
.or(Predicate.isEqual(CDC_SCHEMA_TOPICS))
)
validator = ConfigDef.Range.atLeast(1)
}
)
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ interface SinkStrategyHandler {
} else {
NativeBatchStrategy(
config.neo4j(),
config.getInt(SinkConfiguration.CDC_MAX_BATCHED_QUERIES),
config.getInt(SinkConfiguration.MAX_BATCHED_QUERIES),
config.batchSize,
config.eosOffsetLabel,
SinkStrategy.CDC_SOURCE_ID,
Expand Down Expand Up @@ -265,7 +265,7 @@ interface SinkStrategyHandler {
} else {
NativeBatchStrategy(
config.neo4j(),
config.getInt(SinkConfiguration.CDC_MAX_BATCHED_QUERIES),
config.getInt(SinkConfiguration.MAX_BATCHED_QUERIES),
config.batchSize,
config.eosOffsetLabel,
SinkStrategy.CDC_SCHEMA,
Expand Down