Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.scalar.db.api;

import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager;
import com.scalar.db.common.ActiveTransactionManagedTwoPhaseCommitTransactionManager;
import com.scalar.db.common.StateManagedDistributedTransactionManager;
import com.scalar.db.common.StateManagedTwoPhaseCommitTransactionManager;
import com.scalar.db.config.DatabaseConfig;
import javax.annotation.Nullable;

public abstract class AbstractDistributedTransactionProvider
implements DistributedTransactionProvider {

@Override
public DistributedTransactionManager createDistributedTransactionManager(DatabaseConfig config) {
DistributedTransactionManager transactionManager =
createRawDistributedTransactionManager(config);

// Wrap the transaction manager for state management
transactionManager = new StateManagedDistributedTransactionManager(transactionManager);

if (config.isActiveTransactionManagementEnabled()) {
// Wrap the transaction manager for active transaction management
transactionManager =
new ActiveTransactionManagedDistributedTransactionManager(
transactionManager, config.getActiveTransactionManagementExpirationTimeMillis());
}

return transactionManager;
}

protected abstract DistributedTransactionManager createRawDistributedTransactionManager(
DatabaseConfig config);

@Nullable
@Override
public TwoPhaseCommitTransactionManager createTwoPhaseCommitTransactionManager(
DatabaseConfig config) {
TwoPhaseCommitTransactionManager transactionManager =
createRawTwoPhaseCommitTransactionManager(config);

if (transactionManager == null) {
return null;
}

// Wrap the transaction manager for state management
transactionManager = new StateManagedTwoPhaseCommitTransactionManager(transactionManager);

if (config.isActiveTransactionManagementEnabled()) {
// Wrap the transaction manager for active transaction management
transactionManager =
new ActiveTransactionManagedTwoPhaseCommitTransactionManager(
transactionManager, config.getActiveTransactionManagementExpirationTimeMillis());
}

return transactionManager;
}

protected abstract TwoPhaseCommitTransactionManager createRawTwoPhaseCommitTransactionManager(
DatabaseConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.exception.transaction.TransactionNotFoundException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.util.ActiveExpiringMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,53 +31,24 @@
public class ActiveTransactionManagedDistributedTransactionManager
extends DecoratedDistributedTransactionManager {

private static final long TRANSACTION_EXPIRATION_INTERVAL_MILLIS = 1000;

private static final Logger logger =
LoggerFactory.getLogger(ActiveTransactionManagedDistributedTransactionManager.class);

private final ActiveExpiringMap<String, ActiveTransaction> activeTransactions;

private final AtomicReference<BiConsumer<String, DistributedTransaction>>
transactionExpirationHandler =
new AtomicReference<>(
(id, t) -> {
try {
t.rollback();
} catch (Exception e) {
logger.warn("Rollback failed. Transaction ID: {}", id, e);
}
});
private final ActiveTransactionRegistry<DistributedTransaction> registry;

public ActiveTransactionManagedDistributedTransactionManager(
DistributedTransactionManager transactionManager,
long activeTransactionManagementExpirationTimeMillis) {
DistributedTransactionManager transactionManager, long expirationTimeMillis) {
super(transactionManager);
activeTransactions =
new ActiveExpiringMap<>(
activeTransactionManagementExpirationTimeMillis,
TRANSACTION_EXPIRATION_INTERVAL_MILLIS,
(id, t) -> {
logger.warn("The transaction is expired. Transaction ID: {}", id);
transactionExpirationHandler.get().accept(id, t);
});
}

@Override
public void setTransactionExpirationHandler(BiConsumer<String, DistributedTransaction> handler) {
transactionExpirationHandler.set(handler);
registry =
new ActiveTransactionRegistry<>(expirationTimeMillis, DistributedTransaction::rollback);
}

private void add(ActiveTransaction transaction) throws TransactionException {
if (activeTransactions.putIfAbsent(transaction.getId(), transaction).isPresent()) {
transaction.rollback();
throw new TransactionException(
CoreError.TRANSACTION_ALREADY_EXISTS.buildMessage(), transaction.getId());
}
}

private void remove(String transactionId) {
activeTransactions.remove(transactionId);
public ActiveTransactionManagedDistributedTransactionManager(
DistributedTransactionManager transactionManager,
long expirationTimeMillis,
ActiveTransactionRegistry.TransactionRollback<DistributedTransaction> rollbackFunction) {
super(transactionManager);
registry = new ActiveTransactionRegistry<>(expirationTimeMillis, rollbackFunction);
}

@Override
Expand All @@ -98,7 +64,7 @@ public DistributedTransaction join(String txId) throws TransactionNotFoundExcept

@Override
public DistributedTransaction resume(String txId) throws TransactionNotFoundException {
return activeTransactions
return registry
.get(txId)
.orElseThrow(
() ->
Expand All @@ -117,7 +83,18 @@ class ActiveTransaction extends DecoratedDistributedTransaction {
@SuppressFBWarnings("EI_EXPOSE_REP2")
private ActiveTransaction(DistributedTransaction transaction) throws TransactionException {
super(transaction);
add(this);
if (!registry.add(getId(), this)) {
try {
transaction.rollback();
} catch (RollbackException e) {
logger.warn(
"Rollback failed during duplicate transaction handling. Transaction ID: {}",
getId(),
e);
}
throw new TransactionException(
CoreError.TRANSACTION_ALREADY_EXISTS.buildMessage(), getId());
}
}

@Override
Expand All @@ -132,37 +109,7 @@ public synchronized List<Result> scan(Scan scan) throws CrudException {

@Override
public synchronized Scanner getScanner(Scan scan) throws CrudException {
Scanner scanner = super.getScanner(scan);
return new Scanner() {
@Override
public Optional<Result> one() throws CrudException {
synchronized (ActiveTransaction.this) {
return scanner.one();
}
}

@Override
public List<Result> all() throws CrudException {
synchronized (ActiveTransaction.this) {
return scanner.all();
}
}

@Override
public void close() throws CrudException {
synchronized (ActiveTransaction.this) {
scanner.close();
}
}

@Nonnull
@Override
public Iterator<Result> iterator() {
synchronized (ActiveTransaction.this) {
return scanner.iterator();
}
}
};
return new SynchronizedScanner(this, super.getScanner(scan));
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand Down Expand Up @@ -220,15 +167,15 @@ public synchronized List<BatchResult> batch(List<? extends Operation> operations
@Override
public synchronized void commit() throws CommitException, UnknownTransactionStatusException {
super.commit();
remove(getId());
registry.remove(getId());
}

@Override
public synchronized void rollback() throws RollbackException {
try {
super.rollback();
} finally {
remove(getId());
registry.remove(getId());
}
}

Expand All @@ -237,7 +184,7 @@ public synchronized void abort() throws AbortException {
try {
super.abort();
} finally {
remove(getId());
registry.remove(getId());
}
}
}
Expand Down
Loading