diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 227bab6d56..6722b6812d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -275,6 +275,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void commit(long txID, boolean lineUpContext) throws Exception; + void asyncCommit(long txID) throws Exception; + void rollback(long txID) throws Exception; void rollbackBindings(long txID) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index e7964e4f73..4c6a5cdb13 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -664,6 +664,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } } + @Override + public void asyncCommit(final long txID) throws Exception { + try (ArtemisCloseable lock = closeableReadLock()) { + messageJournal.appendCommitRecord(txID, false, getContext(true), true); + } + } + @Override public void rollback(final long txID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 6b591fbc38..11538e4f0d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -208,6 +208,10 @@ public class NullStorageManager implements StorageManager { public void commit(final long txID) throws Exception { } + @Override + public void asyncCommit(final long txID) { + } + @Override public JournalLoadInformation loadBindingJournal(final List queueBindingInfos, final List groupingInfos, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java index 41aa7ed16a..befaaf3405 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java @@ -74,6 +74,8 @@ public interface Transaction { void addOperation(TransactionOperation sync); + void afterWired(Runnable runnable); + /** * This is an operation that will be called right after the storage is completed. * addOperation could only happen after paging and replication, while these operations will just be @@ -102,4 +104,9 @@ public interface Transaction { void setTimeout(int timeout); RefsOperation createRefsOperation(Queue queue, AckReason reason); + + boolean isAsync(); + + /** To be used on control transactions that are meant as internal and don't really require a hard sync. */ + Transaction setAsync(boolean async); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index f619c289f5..b371e6745f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -54,11 +54,11 @@ public class TransactionImpl implements Transaction { protected final StorageManager storageManager; - private final Xid xid; + protected final Xid xid; - private final long id; + protected final long id; - private volatile State state = State.ACTIVE; + protected volatile State state = State.ACTIVE; private ActiveMQException exception; @@ -66,12 +66,27 @@ public class TransactionImpl implements Transaction { private final long createTime; - private volatile boolean containsPersistent; + protected volatile boolean containsPersistent; private int timeoutSeconds = -1; private Object protocolData; + private boolean async; + + private Runnable afterWired; + + @Override + public boolean isAsync() { + return async; + } + + @Override + public TransactionImpl setAsync(boolean async) { + this.async = async; + return this; + } + @Override public Object getProtocolData() { return protocolData; @@ -230,6 +245,18 @@ public class TransactionImpl implements Transaction { commit(true); } + @Override + public void afterWired(Runnable callback) { + this.afterWired = callback; + } + + private void wired() { + if (afterWired != null) { + afterWired.run(); + afterWired = null; + } + } + @Override public void commit(final boolean onePhase) throws Exception { logger.trace("TransactionImpl::commit::{}", this); @@ -306,6 +333,7 @@ public class TransactionImpl implements Transaction { }); } + wired(); } } @@ -316,7 +344,11 @@ public class TransactionImpl implements Transaction { if (containsPersistent || xid != null && state == State.PREPARED) { // ^^ These are the scenarios where we require a storage.commit // for anything else we won't use the journal - storageManager.commit(id); + if (async) { + storageManager.asyncCommit(id); + } else { + storageManager.commit(id); + } } state = State.COMMITTED; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index a07f281311..e201e65006 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -329,6 +329,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } + @Override + public void asyncCommit(long txID) throws Exception { + + } + @Override public ArtemisCloseable closeableReadLock() { return () -> { }; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 40d1d554fd..5aeedf679d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -307,6 +307,11 @@ public class SendAckFailTest extends SpawnedTestBase { manager.stop(); } + @Override + public void asyncCommit(long txID) throws Exception { + + } + @Override public void updateQueueBinding(long tx, Binding binding) throws Exception { manager.updateQueueBinding(tx, binding); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index 26a37c866a..f833340a0d 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -137,11 +137,26 @@ public class BindingsImplTest extends ActiveMQTestBase { return null; } + @Override + public boolean isAsync() { + return false; + } + + @Override + public Transaction setAsync(boolean async) { + return null; + } + @Override public void setProtocolData(Object data) { } + @Override + public void afterWired(Runnable runnable) { + + } + @Override public void afterStore(TransactionOperation sync) {