ARTEMIS-4558 Transactional support for mirroring

- Async commit
 * async here meaning the recording of the commit record is not doing a sync on the storage.
   This is useful for internal operations where we don't need an immediate sync on the journal storage.

- Wired notification
 * I need finer control on a afterWired (to the storage) and before the completions, so I can plug the sync context on mirror right before the commit is called.
This commit is contained in:
Clebert Suconic 2024-01-05 16:47:35 -05:00 committed by clebertsuconic
parent 5e7a9023d8
commit c6a6e036a4
8 changed files with 82 additions and 5 deletions

View File

@ -275,6 +275,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void commit(long txID, boolean lineUpContext) throws Exception;
void asyncCommit(long txID) throws Exception;
void rollback(long txID) throws Exception;
void rollbackBindings(long txID) throws Exception;

View File

@ -664,6 +664,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
@Override
public void asyncCommit(final long txID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendCommitRecord(txID, false, getContext(true), true);
}
}
@Override
public void rollback(final long txID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {

View File

@ -208,6 +208,10 @@ public class NullStorageManager implements StorageManager {
public void commit(final long txID) throws Exception {
}
@Override
public void asyncCommit(final long txID) {
}
@Override
public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
final List<GroupingInfo> groupingInfos,

View File

@ -74,6 +74,8 @@ public interface Transaction {
void addOperation(TransactionOperation sync);
void afterWired(Runnable runnable);
/**
* This is an operation that will be called right after the storage is completed.
* addOperation could only happen after paging and replication, while these operations will just be
@ -102,4 +104,9 @@ public interface Transaction {
void setTimeout(int timeout);
RefsOperation createRefsOperation(Queue queue, AckReason reason);
boolean isAsync();
/** To be used on control transactions that are meant as internal and don't really require a hard sync. */
Transaction setAsync(boolean async);
}

View File

@ -54,11 +54,11 @@ public class TransactionImpl implements Transaction {
protected final StorageManager storageManager;
private final Xid xid;
protected final Xid xid;
private final long id;
protected final long id;
private volatile State state = State.ACTIVE;
protected volatile State state = State.ACTIVE;
private ActiveMQException exception;
@ -66,12 +66,27 @@ public class TransactionImpl implements Transaction {
private final long createTime;
private volatile boolean containsPersistent;
protected volatile boolean containsPersistent;
private int timeoutSeconds = -1;
private Object protocolData;
private boolean async;
private Runnable afterWired;
@Override
public boolean isAsync() {
return async;
}
@Override
public TransactionImpl setAsync(boolean async) {
this.async = async;
return this;
}
@Override
public Object getProtocolData() {
return protocolData;
@ -230,6 +245,18 @@ public class TransactionImpl implements Transaction {
commit(true);
}
@Override
public void afterWired(Runnable callback) {
this.afterWired = callback;
}
private void wired() {
if (afterWired != null) {
afterWired.run();
afterWired = null;
}
}
@Override
public void commit(final boolean onePhase) throws Exception {
logger.trace("TransactionImpl::commit::{}", this);
@ -306,6 +333,7 @@ public class TransactionImpl implements Transaction {
});
}
wired();
}
}
@ -316,7 +344,11 @@ public class TransactionImpl implements Transaction {
if (containsPersistent || xid != null && state == State.PREPARED) {
// ^^ These are the scenarios where we require a storage.commit
// for anything else we won't use the journal
storageManager.commit(id);
if (async) {
storageManager.asyncCommit(id);
} else {
storageManager.commit(id);
}
}
state = State.COMMITTED;

View File

@ -329,6 +329,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public void asyncCommit(long txID) throws Exception {
}
@Override
public ArtemisCloseable closeableReadLock() {
return () -> { };

View File

@ -307,6 +307,11 @@ public class SendAckFailTest extends SpawnedTestBase {
manager.stop();
}
@Override
public void asyncCommit(long txID) throws Exception {
}
@Override
public void updateQueueBinding(long tx, Binding binding) throws Exception {
manager.updateQueueBinding(tx, binding);

View File

@ -137,11 +137,26 @@ public class BindingsImplTest extends ActiveMQTestBase {
return null;
}
@Override
public boolean isAsync() {
return false;
}
@Override
public Transaction setAsync(boolean async) {
return null;
}
@Override
public void setProtocolData(Object data) {
}
@Override
public void afterWired(Runnable runnable) {
}
@Override
public void afterStore(TransactionOperation sync) {