fix: InsertStream CONFLICT_IGNORE + PER_STREAM rolled back stream on commit-time duplicate (#4214)#4262
Conversation
…commit-time duplicate (#4214) Resolve CONFLICT_IGNORE synchronously via a pre-save key-column lookup so the duplicate row never reaches save() and the deferred commit-time unique-index check never sees it. Bumps `ignored` and continues instead of letting the whole stream roll back as `failed`.
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Complexity | 9 |
🟢 Coverage 90.00% diff coverage · -7.47% coverage variation
Metric Results Coverage variation ✅ -7.47% coverage variation Diff coverage ✅ 90.00% diff coverage Coverage variation details
Coverable lines Covered lines Coverage Common ancestor commit (6eb94b1) 127277 93796 73.69% Head commit (8d2c7a6) 158965 (+31688) 105270 (+11474) 66.22% (-7.47%) Coverage variation is the difference between the coverage for the head and common ancestor commits of the pull request branch:
<coverage of head commit> - <coverage of common ancestor commit>Diff coverage details
Coverable lines Covered lines Diff coverage Pull request (#4262) 10 9 90.00% Diff coverage is the percentage of lines that are covered by tests out of the coverable lines that the pull request added or modified:
<covered lines added or modified>/<coverable lines added or modified> * 100%
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request implements support for the CONFLICT_IGNORE mode in gRPC insert streams, allowing the server to skip duplicate records based on specified key columns instead of failing the transaction. The changes include the addition of helper methods to check for existing records and updates to the insertion logic to track ignored rows. A new integration test verifies this behavior. Feedback from the review suggests consolidating the duplicated existence check methods into a single helper and removing redundant checks for empty key columns at the call sites.
| private boolean keyExistsVertex(final InsertContext ctx, final MutableVertex incoming) { | ||
| if (ctx.keyCols.isEmpty()) | ||
| return false; | ||
| final var inDoc = incoming.asDocument(); | ||
| final String where = String.join(" AND ", ctx.keyCols.stream().map(k -> k + " = ?").toList()); | ||
| final Object[] params = ctx.keyCols.stream().map(inDoc::get).toArray(); | ||
| try (final var rs = ctx.db.query("sql", "SELECT FROM " + ctx.opts.getTargetClass() + " WHERE " + where, params)) { | ||
| return rs.hasNext(); | ||
| } | ||
| } | ||
|
|
||
| private boolean keyExistsDocument(final InsertContext ctx, final MutableDocument incoming) { | ||
| if (ctx.keyCols.isEmpty()) | ||
| return false; | ||
| final String where = String.join(" AND ", ctx.keyCols.stream().map(k -> k + " = ?").toList()); | ||
| final Object[] params = ctx.keyCols.stream().map(incoming::get).toArray(); | ||
| try (final ResultSet rs = ctx.db.query("sql", "SELECT FROM " + ctx.opts.getTargetClass() + " WHERE " + where, params)) { | ||
| return rs.hasNext(); | ||
| } | ||
| } |
There was a problem hiding this comment.
The methods keyExistsVertex and keyExistsDocument are identical in logic. They can be merged into a single helper method that accepts a Document parameter, as both MutableVertex and MutableDocument implement this interface. This reduces code duplication and improves maintainability.
private boolean keyExists(final InsertContext ctx, final Document incoming) {
if (ctx.keyCols.isEmpty())
return false;
final String where = String.join(" AND ", ctx.keyCols.stream().map(k -> k + " = ?").toList());
final Object[] params = ctx.keyCols.stream().map(incoming::get).toArray();
try (final ResultSet rs = ctx.db.query("sql", "SELECT FROM " + ctx.opts.getTargetClass() + " WHERE " + where, params)) {
return rs.hasNext();
}
}There was a problem hiding this comment.
Merged into a single keyExists(InsertContext, MutableDocument) in f78b9fa. Used MutableDocument instead of Document since MutableVertex extends MutableDocument - the call sites pass mutable instances directly.
| applyGrpcRecord(v, r); | ||
| if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_UPDATE && tryUpsertVertex(ctx, v)) { | ||
| c.updated++; | ||
| } else if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_IGNORE && !ctx.keyCols.isEmpty() && keyExistsVertex(ctx, v)) { |
There was a problem hiding this comment.
The check !ctx.keyCols.isEmpty() is redundant here because it is already performed inside the keyExists method (or the merged version suggested). Removing it simplifies the condition. Also, the call should be updated to use the merged keyExists method.
| } else if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_IGNORE && !ctx.keyCols.isEmpty() && keyExistsVertex(ctx, v)) { | |
| } else if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_IGNORE && keyExists(ctx, v)) { |
| applyGrpcRecord(d, r); | ||
| if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_UPDATE && tryUpsertDocument(ctx, d)) { | ||
| c.updated++; | ||
| } else if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_IGNORE && !ctx.keyCols.isEmpty() && keyExistsDocument(ctx, d)) { |
There was a problem hiding this comment.
The check !ctx.keyCols.isEmpty() is redundant here because it is already performed inside the keyExists method (or the merged version suggested). Removing it simplifies the condition. Also, the call should be updated to use the merged keyExists method.
| } else if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_IGNORE && !ctx.keyCols.isEmpty() && keyExistsDocument(ctx, d)) { | |
| } else if (ctx.opts.getConflictMode() == ConflictMode.CONFLICT_IGNORE && keyExists(ctx, d)) { |
Code ReviewOverviewThis PR fixes a real regression (#4214) where Issues1. Code duplication -
|
…dd vertex test Address external review feedback: - Merge keyExistsVertex + keyExistsDocument into one keyExists(InsertContext, MutableDocument) since MutableVertex extends MutableDocument - Drop redundant !keyCols.isEmpty() guard at call sites - the method already short-circuits - Add second regression test covering the VERTEX TYPE path (the document-only test missed it before)
|
Addressed in f78b9fa:
Not addressed:
|
Code ReviewOverviewThis PR fixes a real bug: What works well
Concerns1. Residual blind spot in
|
Extends the issue #4214 regression to the PER_BATCH transaction mode for insertStream. The pre-save keyExists check applies uniformly across transaction modes; this case was missing dedicated coverage.
|
Addressed in 8d2c7a6:
Pushing back on the other items: #1 #2 Comment on #3 Extract shared #4 Test style nits (static interceptor, |
Code ReviewGood fix for a real and reproducible bug. The approach is consistent with the existing CorrectnessThe core fix is sound. Pre-checking with In-stream duplicates (two rows with the same key, neither pre-existing): Covered by two mechanisms working together:
TOCTOU race (concurrent streams with the same key): Acknowledged in the PR description as acceptable, consistent with Limitation worth documentingWhen Security - Column name injection (pre-existing, applies equally to new code)// keyExists - and identically in tryUpsertVertex / tryUpsertDocument
final String where = String.join(" AND ", ctx.keyCols.stream().map(k -> k + " = ?").toList());Column names and the target-class name are concatenated directly into the SQL string. Values are parameterized (safe), but column names from the gRPC request are not. A client supplying a This PR does not introduce the problem, but since it adds a third call site of the same pattern, it is a good moment to address it - either via schema validation (verify each Test coverage gapAll three test methods share the same scenario structure: one pre-existing Alice, then stream [Bob, Alice (dup), Carol]. Consider adding a fourth test for in-stream duplicates with no pre-existing row - e.g. stream [Alice, Alice] - to confirm that the immediate-throw path in Minor nitsCode duplication: The WHERE-clause construction pattern now appears three times (
SummaryThe fix correctly resolves the reported bug for all tested modes (PER_STREAM documents, PER_BATCH documents, PER_STREAM vertices). The column-name injection concern and the missing in-stream-duplicate test are the two items I would prioritize before merge. Everything else is minor polish. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4262 +/- ##
=======================================
Coverage 64.63% 64.64%
=======================================
Files 1644 1644
Lines 127277 127287 +10
Branches 27267 27269 +2
=======================================
+ Hits 82266 82281 +15
+ Misses 33509 33508 -1
+ Partials 11502 11498 -4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary
Closes #4214.
InsertStreamwithconflict_mode = CONFLICT_IGNORE+transaction_mode = PER_STREAMused to roll back the entire stream on a commit-timeDuplicatedKeyException, reportingfailed = receivedinstead of bumpingignoredand committing the remaining rows.insertRows(...)already handledCONFLICT_IGNORE, but underPER_STREAMthe unique-index enforcement is deferred toctx.flushCommit(true)inonCompleted, whererecordCommitException(added in Bug:InsertStreamsurfaces commit-time constraint violations as a stream-levelINTERNALerror instead of populatingInsertSummary.errors#4198) reclassifies every row asfailedregardless ofconflict_mode.CONFLICT_IGNOREsynchronously via a pre-save key-column lookup (keyExistsVertex/keyExistsDocument, mirroring the existingtryUpsertVertex/tryUpsertDocumentpattern used byCONFLICT_UPDATE). The duplicate row never reachessave(), so the deferred commit never sees it;ignoredis bumped exactly once per skipped row and the remaining rows commit cleanly.Test plan
Issue4214InsertStreamConflictIgnoreIT: insertsBob, duplicateAlice,Carolagainst a pre-existingAliceunderPER_STREAM+CONFLICT_IGNORE+key_columns = ["name"]. Assertsreceived=3,inserted=2,ignored=1,failed=0,errors=[], and confirms all three names persist via a follow-upSELECT count(*).Issue4198InsertStreamCommitErrorITstill passes (CONFLICT_ERROR+ commit-time violation still surfaces structured errors).GrpcServerITsuite passes (28 tests, includingbulkInsertWithConflictIgnoreunderPER_BATCH).