Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d20b3d3
Initial plan
Copilot Oct 12, 2025
5703bdf
Replace Thread.sleep with Awaitility and Thread[] with ExecutorServic…
Copilot Oct 12, 2025
508ebf1
Replace Thread.sleep with Awaitility and Thread[] with ExecutorServic…
Copilot Oct 12, 2025
e000be1
Replace Thread.sleep with Awaitility and Thread[] with ExecutorServic…
Copilot Oct 12, 2025
19ae6e1
Replace Thread.sleep with Awaitility and Thread[] with ExecutorServic…
Copilot Oct 12, 2025
6f870dd
fix pre-commit errors
robfrank Oct 12, 2025
19b0922
Fix compilation errors - remove non-existent getExecutor().getQueue()…
Copilot Oct 12, 2025
f323eb7
Fix syntax error - remove extra closing brace in RandomTestMultiThrea…
Copilot Oct 12, 2025
27a70e0
Fix RandomTestMultiThreadsTest conversion - properly replace Thread[]…
Copilot Oct 12, 2025
27cce16
Fix duplicate try block - remove extra try and its closing brace
Copilot Oct 12, 2025
47ce60f
fix compilation error
robfrank Oct 12, 2025
f1b48fb
fix: increase timeout and improve error handling in Awaitility waits …
robfrank Oct 12, 2025
ab2042f
fix: update property type assertions from Integer to Long and increas…
robfrank Oct 12, 2025
1a3d176
fix: enhance error handling and improve response structure in server …
robfrank Oct 12, 2025
4251da3
test: add end-to-end tests for Swagger UI API documentation
robfrank Oct 13, 2025
dd39eb5
fix: refactor database initialization in RemoteDateIT to ensure prope…
robfrank Oct 13, 2025
b703f47
fix: update Swagger UI test assertions for CSS and JavaScript paths, …
robfrank Oct 13, 2025
6c052c1
add logs
robfrank Oct 15, 2025
c05f5b6
fix: refactor imports and enhance error handling in test classes
robfrank Oct 15, 2025
c92b879
fix: add missing imports for concurrency utilities in ACIDTransaction…
robfrank Oct 15, 2025
d240ff3
wip
robfrank Oct 15, 2025
9c700ac
fix: clean up imports in RemoteSchema.java and remove unnecessary deb…
robfrank Oct 15, 2025
906b47c
fix: correct SQL query syntax in getTypeByBucketName method
robfrank Oct 15, 2025
8311e3e
fix: update imports and enhance error handling in RemoteHttpComponent…
robfrank Oct 15, 2025
03c61f3
fix: clean up imports and enhance code readability in LocalDatabase a…
robfrank Oct 18, 2025
942adc2
fix: optimize transaction handling and clean up redundant checks in L…
robfrank Oct 18, 2025
fab7324
fix: refactor TransactionContext and LocalTransactionExplicitLock for…
robfrank Oct 18, 2025
15fe9dc
fix: streamline initialization and null checks in TransactionContext …
robfrank Oct 18, 2025
50fdac4
revert to old impl
robfrank Oct 19, 2025
b80987d
fix: disable replication tests to prevent failures during integration
robfrank Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions engine/src/main/java/com/arcadedb/database/LocalDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,33 @@
import com.arcadedb.utility.MultiIterator;
import com.arcadedb.utility.RWLockContext;

import java.io.*;
import java.nio.channels.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.logging.*;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;

/**
* Local implementation of {@link Database}. It is based on files opened on the local file system.
Expand All @@ -99,9 +118,14 @@
public class LocalDatabase extends RWLockContext implements DatabaseInternal {
public static final int EDGE_LIST_INITIAL_CHUNK_SIZE = 64;
public static final int MAX_RECOMMENDED_EDGE_LIST_CHUNK_SIZE = 8192;
private static final Set<String> SUPPORTED_FILE_EXT = Set.of(Dictionary.DICT_EXT,
LocalBucket.BUCKET_EXT, LSMTreeIndexMutable.NOTUNIQUE_INDEX_EXT, LSMTreeIndexMutable.UNIQUE_INDEX_EXT,
LSMTreeIndexCompacted.NOTUNIQUE_INDEX_EXT, LSMTreeIndexCompacted.UNIQUE_INDEX_EXT, HnswVectorIndex.FILE_EXT);
private static final Set<String> SUPPORTED_FILE_EXT = Set.of(
Dictionary.DICT_EXT,
LocalBucket.BUCKET_EXT,
LSMTreeIndexMutable.NOTUNIQUE_INDEX_EXT,
LSMTreeIndexMutable.UNIQUE_INDEX_EXT,
LSMTreeIndexCompacted.NOTUNIQUE_INDEX_EXT,
LSMTreeIndexCompacted.UNIQUE_INDEX_EXT,
HnswVectorIndex.FILE_EXT);
public final AtomicLong indexCompactions = new AtomicLong();
protected final String name;
protected final ComponentFile.MODE mode;
Expand Down Expand Up @@ -538,10 +562,10 @@ public void scanBucket(final String bucketName, final RecordCallback callback, f
public Iterator<Record> iterateType(final String typeName, final boolean polymorphic) {
stats.iterateType.incrementAndGet();

return (Iterator<Record>) executeInReadLock(() -> {
return executeInReadLock(() -> {
checkDatabaseIsOpen();
final DocumentType type = schema.getType(typeName);
final MultiIterator iter = new MultiIterator();
var type = schema.getType(typeName);
var iter = new MultiIterator<Record>();

// SET THE PROFILED LIMITS IF ANY
iter.setLimit(getResultSetLimit());
Expand Down Expand Up @@ -1637,12 +1661,7 @@ record = events.onAfterRead(record);
if (record instanceof Document document) {
final DocumentType type = document.getType();
if (type != null) {
// System.out.println("invokeAfterReadEvents for type = pre");

Record record1 = ((RecordEventsRegistry) type.getEvents()).onAfterRead(record);
// System.out.println("invokeAfterReadEvents for type = after");

return record1;
return ((RecordEventsRegistry) type.getEvents()).onAfterRead(record);
}
}
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.arcadedb.database;

import com.arcadedb.engine.Bucket;
import com.arcadedb.index.IndexInternal;
import com.arcadedb.log.LogManager;
import com.arcadedb.schema.DocumentType;

Expand Down Expand Up @@ -61,7 +62,7 @@ public LocalTransactionExplicitLock type(final String typeName) {
// Lock all indexes for this type
filesToLock.addAll(type.getAllIndexes(true).stream()
.flatMap(i -> Arrays.stream(i.getIndexesOnBuckets()))
.map(b -> b.getFileId())
.map(IndexInternal::getFileId)
.toList());

// Lock all currently involved buckets for this type
Expand Down
27 changes: 21 additions & 6 deletions engine/src/main/java/com/arcadedb/schema/LocalSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,26 @@
import com.arcadedb.serializer.json.JSONObject;
import com.arcadedb.utility.FileUtils;

import java.io.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

/**
* Local implementation of the database schema.
Expand Down Expand Up @@ -1199,6 +1213,7 @@ public synchronized void saveConfiguration() {
}

try {
LogManager.instance().log(this, Level.INFO, "Saving schema configuration to file - versionSerial = %s ", versionSerial);
versionSerial.incrementAndGet();

update(toJSON());
Expand Down
54 changes: 33 additions & 21 deletions engine/src/test/java/com/arcadedb/ACIDTransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.arcadedb.schema.Schema;
import com.arcadedb.schema.Type;
import com.arcadedb.schema.VertexType;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

Expand All @@ -46,6 +47,12 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
Expand Down Expand Up @@ -78,13 +85,6 @@ public void testAsyncTX() {

db.async().waitCompletion();

try {
Thread.sleep(500);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
// IGNORE IT
}

} catch (final TransactionException e) {
assertThat(e.getCause() instanceof IOException).isTrue();
}
Expand Down Expand Up @@ -240,13 +240,6 @@ public Void call() throws IOException {

db.async().waitCompletion();

try {
Thread.sleep(500);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
// IGNORE IT
}

assertThat(errors.get()).isEqualTo(1);

} catch (final TransactionException e) {
Expand Down Expand Up @@ -486,10 +479,12 @@ public void testExceptionInsideTransaction() {

final int CONCURRENT_THREADS = 4;

// SPAWN ALL THE THREADS
final Thread[] threads = new Thread[CONCURRENT_THREADS];
// SPAWN ALL THE THREADS USING EXECUTORSERVICE
final ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_THREADS);
final List<Future<?>> futures = new ArrayList<>();

for (int i = 0; i < CONCURRENT_THREADS; i++) {
threads[i] = new Thread(() -> {
Future<?> future = executorService.submit(() -> {
for (int k = 0; k < TOT; ++k) {
final int id = k;

Expand All @@ -512,16 +507,33 @@ public void testExceptionInsideTransaction() {
}

});
threads[i].start();
futures.add(future);
}

// WAIT FOR ALL THE THREADS
for (int i = 0; i < CONCURRENT_THREADS; i++)
for (Future<?> future : futures) {
try {
threads[i].join();
future.get(120, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// IGNORE IT
Thread.currentThread().interrupt();
LogManager.instance().log(this, Level.WARNING, "Thread interrupted while waiting for future", e);
} catch (ExecutionException e) {
LogManager.instance().log(this, Level.WARNING, "Execution exception in future", e);
} catch (TimeoutException e) {
LogManager.instance().log(this, Level.SEVERE, "Future timed out after 120 seconds", e);
future.cancel(true);
}
}

executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}

assertThat(database.countType("Node", true)).isEqualTo(1);

Expand Down
Loading
Loading