Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions src/node_sqlite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1647,26 +1647,28 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
job->ScheduleBackup();
}

struct ConflictCallbackContext {
std::function<bool(std::string_view)> filterCallback;
std::function<int(int)> conflictCallback;
};

// the reason for using static functions here is that SQLite needs a
// function pointer
static std::function<int(int)> conflictCallback;

static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) {
if (!conflictCallback) return SQLITE_CHANGESET_ABORT;
return conflictCallback(eConflict);
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
if (!ctx->conflictCallback) return SQLITE_CHANGESET_ABORT;
return ctx->conflictCallback(eConflict);
}

static std::function<bool(std::string)> filterCallback;

static int xFilter(void* pCtx, const char* zTab) {
if (!filterCallback) return 1;

return filterCallback(zTab) ? 1 : 0;
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
if (!ctx->filterCallback) return 1;
return ctx->filterCallback(zTab) ? 1 : 0;
}

void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
conflictCallback = nullptr;
filterCallback = nullptr;
ConflictCallbackContext context;

DatabaseSync* db;
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
Expand Down Expand Up @@ -1702,7 +1704,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
return;
}
Local<Function> conflictFunc = conflictValue.As<Function>();
conflictCallback = [env, conflictFunc](int conflictType) -> int {
context.conflictCallback = [env, conflictFunc](int conflictType) -> int {
Local<Value> argv[] = {Integer::New(env->isolate(), conflictType)};
TryCatch try_catch(env->isolate());
Local<Value> result =
Expand Down Expand Up @@ -1740,15 +1742,18 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {

Local<Function> filterFunc = filterValue.As<Function>();

filterCallback = [env, filterFunc](std::string item) -> bool {
context.filterCallback = [env,
filterFunc](std::string_view item) -> bool {
// TODO(@jasnell): The use of ToLocalChecked here means that if
// the filter function throws an error the process will crash.
// The filterCallback should be updated to avoid the check and
// propagate the error correctly.
Local<Value> argv[] = {String::NewFromUtf8(env->isolate(),
item.c_str(),
NewStringType::kNormal)
.ToLocalChecked()};
Local<Value> argv[] = {
String::NewFromUtf8(env->isolate(),
item.data(),
NewStringType::kNormal,
static_cast<int>(item.size()))
.ToLocalChecked()};
Local<Value> result =
filterFunc->Call(env->context(), Null(env->isolate()), 1, argv)
.ToLocalChecked();
Expand All @@ -1764,7 +1769,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
const_cast<void*>(static_cast<const void*>(buf.data())),
xFilter,
xConflict,
nullptr);
static_cast<void*>(&context));
if (r == SQLITE_OK) {
args.GetReturnValue().Set(true);
return;
Expand Down
74 changes: 74 additions & 0 deletions test/parallel/test-sqlite-session.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ const {
constants,
} = require('node:sqlite');
const { test, suite } = require('node:test');
const { nextDb } = require('../sqlite/next-db.js');
const { Worker } = require('worker_threads');
const { once } = require('events');

/**
* Convenience wrapper around assert.deepStrictEqual that sets a null
Expand Down Expand Up @@ -555,3 +558,74 @@ test('session supports ERM', (t) => {
message: /session is not open/,
});
});

test('concurrent applyChangeset with workers', async (t) => {
// Before adding this test, the callbacks were stored in static variables
// this could result in a crash
// this test is a regression test for that scenario

function modeToString(mode) {
if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT';
if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT';
}

const dbPath = nextDb();
const db1 = new DatabaseSync(dbPath);
const db2 = new DatabaseSync(':memory:');
const createTable = `
CREATE TABLE data(
key INTEGER PRIMARY KEY,
value TEXT
) STRICT`;
db1.exec(createTable);
db2.exec(createTable);
db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello');
db1.close();
const session = db2.createSession();
db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world');
const changeset = session.changeset(); // Changeset with conflict (for db1)

const iterations = 10;
for (let i = 0; i < iterations; i++) {
const workers = [];
const expectedResults = new Map([
[constants.SQLITE_CHANGESET_ABORT, false],
[constants.SQLITE_CHANGESET_OMIT, true]]
);

// Launch two workers (abort and omit modes)
for (const mode of [constants.SQLITE_CHANGESET_ABORT, constants.SQLITE_CHANGESET_OMIT]) {
const worker = new Worker(`${__dirname}/../sqlite/worker.js`, {
workerData: {
dbPath,
changeset,
mode
},
});
workers.push(worker);
}

const results = await Promise.all(workers.map(async (worker) => {
const [message] = await once(worker, 'message');
return message;
}));

// Verify each result
for (const res of results) {
if (res.errorMessage) {
if (res.errcode === 5) { // SQLITE_BUSY
break; // ignore
}
t.assert.fail(`Worker error: ${res.error.message}`);
}
const expected = expectedResults.get(res.mode);
t.assert.strictEqual(
res.result,
expected,
`Iteration ${i}: Worker (${modeToString(res.mode)}) expected ${expected} but got ${res.result}`
);
}

workers.forEach((worker) => worker.terminate()); // Cleanup
}
});
10 changes: 1 addition & 9 deletions test/parallel/test-sqlite.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
'use strict';
const { spawnPromisified, skipIfSQLiteMissing } = require('../common');
skipIfSQLiteMissing();
const tmpdir = require('../common/tmpdir');
const { join } = require('node:path');
const { DatabaseSync, constants } = require('node:sqlite');
const { suite, test } = require('node:test');
const { pathToFileURL } = require('node:url');
let cnt = 0;

tmpdir.refresh();

function nextDb() {
return join(tmpdir.path, `database-${cnt++}.db`);
}
const { nextDb } = require('../sqlite/next-db.js');

suite('accessing the node:sqlite module', () => {
test('cannot be accessed without the node: scheme', (t) => {
Expand Down
14 changes: 14 additions & 0 deletions test/sqlite/next-db.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict';
require('../common');
const tmpdir = require('../common/tmpdir');
const { join } = require('node:path');

let cnt = 0;

tmpdir.refresh();

function nextDb() {
return join(tmpdir.path, `database-${cnt++}.db`);
}

module.exports = { nextDb };
24 changes: 24 additions & 0 deletions test/sqlite/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// This worker is used for one of the tests in test-sqlite-session.js

'use strict';
require('../common');
const { parentPort, workerData } = require('worker_threads');
const { DatabaseSync, constants } = require('node:sqlite');
const { changeset, mode, dbPath } = workerData;

const db = new DatabaseSync(dbPath);

const options = {};
if (mode !== constants.SQLITE_CHANGESET_ABORT && mode !== constants.SQLITE_CHANGESET_OMIT) {
throw new Error('Unexpected value for mode');
}
options.onConflict = () => mode;

try {
const result = db.applyChangeset(changeset, options);
parentPort.postMessage({ mode, result, error: null });
} catch (error) {
parentPort.postMessage({ mode, result: null, errorMessage: error.message, errcode: error.errcode });
} finally {
db.close(); // Just to make sure it is closed ASAP
}
Loading