Skip to content

Data visitor #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/src/bindings/bindings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class _ObjectBoxBindings {
// common functions
void Function(Pointer<Int32> major, Pointer<Int32> minor, Pointer<Int32> patch) obx_version;
Pointer<Utf8> Function() obx_version_string;
int Function() obx_supports_bytes_array;

obx_free_dart_t<OBX_bytes_array> obx_bytes_array_free;
obx_free_dart_t<OBX_id_array> obx_id_array_free;
Expand Down Expand Up @@ -68,6 +69,10 @@ class _ObjectBoxBindings {
int Function(Pointer<Void> box, int id, Pointer<Pointer<Uint8>> data, Pointer<IntPtr> size) obx_box_get;
Pointer<OBX_bytes_array> Function(Pointer<Void> box, Pointer<OBX_id_array> ids) obx_box_get_many;
Pointer<OBX_bytes_array> Function(Pointer<Void> box) obx_box_get_all;
int Function(Pointer<Void> box, Pointer<OBX_id_array> ids, Pointer<NativeFunction<obx_data_visitor_native_t>> visitor,
Pointer<Void> user_data) obx_box_visit_many;
int Function(Pointer<Void> box, Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data)
obx_box_visit_all;
int Function(Pointer<Void> box, int id_or_zero) obx_box_id_for_put;
int Function(Pointer<Void> box, int count, Pointer<Uint64> out_first_id) obx_box_ids_for_put;
int Function(Pointer<Void> box, int id, Pointer<Uint8> data, int size, int mode) obx_box_put;
Expand Down Expand Up @@ -162,6 +167,7 @@ class _ObjectBoxBindings {
// common functions
obx_version = _fn<obx_version_native_t>("obx_version").asFunction();
obx_version_string = _fn<obx_version_string_native_t>("obx_version_string").asFunction();
obx_supports_bytes_array = _fn<obx_supports_bytes_array_native_t>("obx_supports_bytes_array").asFunction();
obx_bytes_array_free = _fn<obx_free_native_t<Pointer<OBX_bytes_array>>>("obx_bytes_array_free").asFunction();
obx_id_array_free = _fn<obx_free_native_t<Pointer<OBX_id_array>>>("obx_id_array_free").asFunction();
// obx_string_array_free = _fn<obx_free_native_t<Pointer<>>>("obx_string_array_free").asFunction();
Expand Down Expand Up @@ -215,6 +221,8 @@ class _ObjectBoxBindings {
obx_box_get = _fn<obx_box_get_native_t>("obx_box_get").asFunction();
obx_box_get_many = _fn<obx_box_get_many_native_t>("obx_box_get_many").asFunction();
obx_box_get_all = _fn<obx_box_get_all_native_t>("obx_box_get_all").asFunction();
obx_box_visit_many = _fn<obx_box_visit_many_native_t>("obx_box_visit_many").asFunction();
obx_box_visit_all = _fn<obx_box_visit_all_native_t>("obx_box_visit_all").asFunction();
obx_box_id_for_put = _fn<obx_box_id_for_put_native_t>("obx_box_id_for_put").asFunction();
obx_box_ids_for_put = _fn<obx_box_ids_for_put_native_t>("obx_box_ids_for_put").asFunction();
obx_box_put = _fn<obx_box_put_native_t>("obx_box_put").asFunction();
Expand Down
72 changes: 72 additions & 0 deletions lib/src/bindings/data_visitor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import 'dart:ffi';
import 'signatures.dart';
import "package:ffi/ffi.dart" show allocate, free;

/// This file implements C call forwarding using a trampoline approach.
///
/// When you want to pass a dart callback to a C function you cannot use lambdas and instead the callback must be
/// a static function, otherwise `Pointer.fromFunction()` called with your function won't compile.
/// Since static functions don't have any state, you must either rely on a global state or use a "userData" pointer
/// pass-through functionality provided by a C function.
///
/// The DataVisitor class tries to alleviate the burden of managing this and instead allows using lambdas from
/// user-code, internally mapping the C calls to the appropriate lambda.
///
/// Sample usage:
/// final results = <T>[];
/// final visitor = DataVisitor((Pointer<Uint8> dataPtr, int length) {
/// final bytes = dataPtr.asTypedList(length);
/// results.add(_fbManager.unmarshal(bytes));
/// return true; // return value usually indicates to the C function whether it should continue.
/// });
///
/// final err = bindings.obx_query_visit(_cQuery, visitor.fn, visitor.userData, offset, limit);
/// visitor.close(); // make sure to close the visitor, unregistering the callback it from the forwarder
/// checkObx(err);

int _lastId = 0;
final _callbacks = <int, bool Function(Pointer<Uint8> dataPtr, int length)>{};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need that for? E.g. this is one of the visitor C APIs:

obx_err obx_query_visit(OBX_query* query, obx_data_visitor* visitor, void* user_data, uint64_t offset, uint64_t limit);

I assume you want to pass "id" as user_data? Is this more "convenient" that passing in a lambda-like thingy as visitor? The latter would already be thread-safe, which the current map-based is not, I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lambdas can't be passed as a C callback, FFI requires static functions. I've tried that (lambda) approach but had to resolve to this trampoline one instead (basically the same thing we do in Go)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, please make sure to document that in the code so that this explanation is not lost

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter would already be thread-safe, which the current map-based is not, I think?

I think this was not addressed yet. Does that require additional thread synchronization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no concurrent memory access in Dart. All code in a single isolate executes on a single event-loop and isolates don't share memory.

Copy link
Contributor Author

@vaind vaind Mar 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how so? I don't see a problem with the code in this PR. You mean something unrelated, e.g. transactions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my limited understanding, I imagine this to happen:

  • Let's assume two isolates, both starting with IDs at 1 (produce colliding IDs, because isolated...)
  • Callbacks from core happen on a different thread
  • (Open question: not sure about how a "foreign" thread is mapped to an isolate. So, which isolate gets the callback?)
  • Now, I don't see how we ensured that one isolate gets only its callback in using its ID.

E.g. Isolate 2 registers a callback for ID 1, but isolate 1 gets to consume it because it also has a pending callback for ID 1.

Copy link
Contributor Author

@vaind vaind Mar 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks from core happen on a different thread

They don't with objectbox-c visitors - all calls are synchronous, executed on the same thread. This approach wouldn't work for other kinds of callbacks, where you register the callback and it may happen in the future (e.g. some kind of event listeners). Not sure how that could be handled with dart, if at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's an open issue with some workarounds available dart-lang/sdk#37022

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that one is strictly synchronous and if FFI guarantees synchronous callbacks to execute in the same isolate (I assume that is defined somewhere?), we should be fine.


// called from C, forwards calls to the actual callback registered at the given ID
int _forwarder(Pointer<Void> callbackId, Pointer<Uint8> dataPtr, int size) {
if (callbackId == null || callbackId.address == 0) {
throw Exception("Data-visitor callback issued with NULL user_data (callback ID)");
}

return _callbacks[callbackId.cast<Int64>().value](dataPtr, size) ? 1 : 0;
}

/// A data visitor wrapper/forwarder to be used where obx_data_visitor is expected.
class DataVisitor {
int _id;
Pointer<Int64> _idPtr;

Pointer<NativeFunction<obx_data_visitor_native_t>> get fn => Pointer.fromFunction(_forwarder, 0);

Pointer<Void> get userData => _idPtr.cast<Void>();

DataVisitor(bool Function(Pointer<Uint8> dataPtr, int length) callback) {
// cycle through ids until we find an empty slot
_lastId++;
var initialId = _lastId;
while (_callbacks.containsKey(_lastId)) {
_lastId++;

if (initialId == _lastId) {
throw Exception("Data-visitor callbacks queue full - can't allocate another");
}
}
// register the visitor
_id = _lastId;
_callbacks[_id] = callback;

_idPtr = allocate<Int64>();
_idPtr.value = _id;
}

void close() {
// unregister the visitor
_callbacks.remove(_id);
free(_idPtr);
}
}
17 changes: 12 additions & 5 deletions lib/src/bindings/signatures.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import 'package:ffi/ffi.dart';
// common functions
typedef obx_version_native_t = Void Function(Pointer<Int32> major, Pointer<Int32> minor, Pointer<Int32> patch);
typedef obx_version_string_native_t = Pointer<Utf8> Function();
typedef obx_supports_bytes_array_native_t = Uint8 Function();

typedef obx_free_dart_t<T extends NativeType> = void Function(Pointer<T> ptr);
typedef obx_free_native_t<T extends NativeType> = Void Function(T ptr); // no Pointer<T>, code analysis fails on usage

typedef obx_data_visitor_native_t = Uint8 Function(Pointer<Void> user_data, Pointer<Uint8> data, IntPtr size);

// error info
typedef obx_last_error_code_native_t = Int32 Function();
typedef obx_last_error_message_native_t = Pointer<Utf8> Function();
Expand Down Expand Up @@ -58,6 +61,10 @@ typedef obx_box_get_native_t = Int32 Function(
Pointer<Void> box, Uint64 id, Pointer<Pointer<Uint8>> data, Pointer<IntPtr> size);
typedef obx_box_get_many_native_t = Pointer<OBX_bytes_array> Function(Pointer<Void> box, Pointer<OBX_id_array> ids);
typedef obx_box_get_all_native_t = Pointer<OBX_bytes_array> Function(Pointer<Void> box);
typedef obx_box_visit_many_native_t = Int32 Function(Pointer<Void> box, Pointer<OBX_id_array> ids,
Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data);
typedef obx_box_visit_all_native_t = Int32 Function(
Pointer<Void> box, Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data);
typedef obx_box_id_for_put_native_t = Uint64 Function(Pointer<Void> box, Uint64 id_or_zero);
typedef obx_box_ids_for_put_native_t = Int32 Function(Pointer<Void> box, Uint64 count, Pointer<Uint64> out_first_id);
typedef obx_box_put_native_t = Int32 Function(
Expand All @@ -76,7 +83,7 @@ typedef obx_box_is_empty_native_t = Int32 Function(Pointer<Void> box, Pointer<Ui
// typedef Pointer<Int8> -> char[]
// typedef Pointer<Int32> -> int (e.g. obx_qb_cond);

// query builider
// query builder
typedef obx_query_builder_native_t = Pointer<Void> Function(Pointer<Void> store, Uint32 entity_id);
typedef obx_query_builder_dart_t = Pointer<Void> Function(Pointer<Void> store, int entity_id);

Expand Down Expand Up @@ -147,10 +154,10 @@ typedef obx_query_count_dart_t = int Function(Pointer<Void> query, Pointer<Uint6

typedef obx_query_describe_t = Pointer<Utf8> Function(Pointer<Void> query);

typedef obx_query_visit_native_t = Int32 Function(
Pointer<Void> query, Pointer<Void> visitor, Pointer<Void> user_data, Uint64 offset, Uint64 limit);
typedef obx_query_visit_dart_t = int Function(
Pointer<Void> query, Pointer<Void> visitor, Pointer<Void> user_data, int offset, int limit);
typedef obx_query_visit_native_t = Int32 Function(Pointer<Void> query,
Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data, Uint64 offset, Uint64 limit);
typedef obx_query_visit_dart_t = int Function(Pointer<Void> query,
Pointer<NativeFunction<obx_data_visitor_native_t>> visitor, Pointer<Void> user_data, int offset, int limit);

// Utilities

Expand Down
55 changes: 43 additions & 12 deletions lib/src/box.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'common.dart';
import "store.dart";
import "bindings/bindings.dart";
import "bindings/constants.dart";
import "bindings/data_visitor.dart";
import "bindings/flatbuffers.dart";
import "bindings/helpers.dart";
import "bindings/structs.dart";
Expand All @@ -24,8 +25,9 @@ class Box<T> {
ModelEntity _modelEntity;
ObjectReader<T> _entityReader;
OBXFlatbuffersManager<T> _fbManager;
final bool _supportsBytesArrays;

Box(this._store) {
Box(this._store) : _supportsBytesArrays = bindings.obx_supports_bytes_array() == 1 {
EntityDefinition<T> entityDefs = _store.entityDef<T>();
_modelEntity = entityDefs.model;
_entityReader = entityDefs.reader;
Expand Down Expand Up @@ -160,34 +162,63 @@ class Box<T> {
}
}

List<T> _getMany(bool allowMissing, Pointer<OBX_bytes_array> Function() cCall) {
List<T> _getMany(
bool allowMissing, Pointer<OBX_bytes_array> Function() cGetArray, void Function(DataVisitor) cVisit) {
return _store.runInTransaction(TxMode.Read, () {
final bytesArray = cCall();
try {
return _fbManager.unmarshalArray(bytesArray, allowMissing: allowMissing);
} finally {
bindings.obx_bytes_array_free(bytesArray);
if (_supportsBytesArrays) {
final bytesArray = cGetArray();
try {
return _fbManager.unmarshalArray(bytesArray, allowMissing: allowMissing);
} finally {
bindings.obx_bytes_array_free(bytesArray);
}
} else {
final results = <T>[];
final visitor = DataVisitor((Pointer<Uint8> dataPtr, int length) {
if (dataPtr == null || dataPtr.address == 0 || length == 0) {
if (allowMissing) {
results.add(null);
return true;
} else {
throw Exception('Object not found');
}
}
final bytes = dataPtr.asTypedList(length);
results.add(_fbManager.unmarshal(bytes));
return true;
});

try {
cVisit(visitor);
} finally {
visitor.close();
}
return results;
}
});
}

/// Returns a list of [ids.length] Objects of type T, each corresponding to the location of its ID in [ids].
/// Non-existant IDs become null.
/// Non-existent IDs become null.
List<T> getMany(List<int> ids) {
if (ids.isEmpty) return [];

const bool allowMissing = true; // returns null if null is encountered in the data found
const bool allowMissing = true; // result includes null if an object is missing
return OBX_id_array.executeWith(
ids,
(ptr) => _getMany(allowMissing,
() => checkObxPtr(bindings.obx_box_get_many(_cBox, ptr), "failed to get many objects from box")));
(ptr) => _getMany(
allowMissing,
() => checkObxPtr(bindings.obx_box_get_many(_cBox, ptr), "failed to get many objects from box"),
(DataVisitor visitor) => checkObx(bindings.obx_box_visit_many(_cBox, ptr, visitor.fn, visitor.userData))));
}

/// Returns all stored objects in this Box.
List<T> getAll() {
const bool allowMissing = false; // throw if null is encountered in the data found
return _getMany(
allowMissing, () => checkObxPtr(bindings.obx_box_get_all(_cBox), "failed to get all objects from box"));
allowMissing,
() => checkObxPtr(bindings.obx_box_get_all(_cBox), "failed to get all objects from box"),
(DataVisitor visitor) => checkObx(bindings.obx_box_visit_all(_cBox, visitor.fn, visitor.userData)));
}

/// Returns a builder to create queries for Object matching supplied criteria.
Expand Down
25 changes: 20 additions & 5 deletions lib/src/query/query.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "../store.dart";
import "../common.dart";
import "../bindings/bindings.dart";
import "../bindings/constants.dart";
import "../bindings/data_visitor.dart";
import "../bindings/flatbuffers.dart";
import "../bindings/helpers.dart";
import "../bindings/structs.dart";
Expand Down Expand Up @@ -571,11 +572,25 @@ class Query<T> {

List<T> find({int offset = 0, int limit = 0}) {
return _store.runInTransaction(TxMode.Read, () {
final bytesArray = checkObxPtr(bindings.obx_query_find(_cQuery, offset, limit), "find");
try {
return _fbManager.unmarshalArray(bytesArray);
} finally {
bindings.obx_bytes_array_free(bytesArray);
if (bindings.obx_supports_bytes_array() == 1) {
final bytesArray = checkObxPtr(bindings.obx_query_find(_cQuery, offset, limit), "find");
try {
return _fbManager.unmarshalArray(bytesArray);
} finally {
bindings.obx_bytes_array_free(bytesArray);
}
} else {
final results = <T>[];
final visitor = DataVisitor((Pointer<Uint8> dataPtr, int length) {
final bytes = dataPtr.asTypedList(length);
results.add(_fbManager.unmarshal(bytes));
return true;
});

final err = bindings.obx_query_visit(_cQuery, visitor.fn, visitor.userData, offset, limit);
visitor.close();
checkObx(err);
return results;
}
});
}
Expand Down
24 changes: 23 additions & 1 deletion test/box_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,29 @@ void main() {
}
});

test(".getMany correctly handles non-existant items", () {
test(".getAll/getMany works on large arrays", () {
// This would fail on 32-bit system if objectbox-c obx_supports_bytes_array() wasn't respected
final length = 10 * 1000;
final largeString = 'A' * length;
expect(largeString.length, length);

box.put(TestEntity(tString: largeString));
box.put(TestEntity(tString: largeString));

List<TestEntity> items = box.getAll();
expect(items.length, 2);
expect(items[0].tString, largeString);
expect(items[1].tString, largeString);

box.put(TestEntity(tString: largeString));

items = box.getMany([1, 2]);
expect(items.length, 2);
expect(items[0].tString, largeString);
expect(items[1].tString, largeString);
});

test(".getMany correctly handles non-existent items", () {
final List<TestEntity> items = ["One", "Two"].map((s) => TestEntity(tString: s)).toList();
final List<int> ids = box.putMany(items);
int otherId = 1;
Expand Down
16 changes: 16 additions & 0 deletions test/query_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ void main() {
q.close();
});

test(".find works on large arrays", () {
// This would fail on 32-bit system if objectbox-c obx_supports_bytes_array() wasn't respected
final length = 10 * 1000;
final largeString = 'A' * length;
expect(largeString.length, length);

box.put(TestEntity(tString: largeString));
box.put(TestEntity(tString: largeString));
box.put(TestEntity(tString: largeString));

List<TestEntity> items = box.query(TestEntity_.id.lessThan(3)).build().find();
expect(items.length, 2);
expect(items[0].tString, largeString);
expect(items[1].tString, largeString);
});

test(".count items after grouping with and/or", () {
box.put(TestEntity(tString: "Hello"));
box.put(TestEntity(tString: "Goodbye"));
Expand Down