diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java index cd94a28f92..e977c00ed0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java @@ -31,6 +31,12 @@ public interface PageTransactionInfo extends EncodingSupport { void setCommitted(boolean committed); + void reloadPrepared(Transaction transaction); + + /* When we reload a transaction, + * We may have to add the counters after commit. */ + Transaction getPreparedTransaction(); + void commit(); void rollback(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java index e1f4e72823..59ba4c9f16 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java @@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.paging.cursor.impl; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.LongObjectHashMap; +import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; +import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; @@ -27,13 +29,15 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.utils.collections.LinkedList; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; -import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; @@ -44,8 +48,9 @@ public class PageCounterRebuildManager implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final PagingStore pgStore; + private final PagingManager pagingManager; private final StorageManager sm; - private final LongHashSet transactions; + private final Map transactions; private boolean paging; private long limitPageId; private int limitMessageNr; @@ -53,9 +58,10 @@ public class PageCounterRebuildManager implements Runnable { private final Set storedLargeMessages; - public PageCounterRebuildManager(PagingStore store, LongHashSet transactions, Set storedLargeMessages) { + public PageCounterRebuildManager(PagingManager pagingManager, PagingStore store, Map transactions, Set storedLargeMessages) { // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end initialize(store); + this.pagingManager = pagingManager; this.pgStore = store; this.sm = store.getStorageManager(); this.transactions = transactions; @@ -241,28 +247,64 @@ public class PageCounterRebuildManager implements Runnable { if (logger.isTraceEnabled()) { logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues); } + + PageTransactionInfo txInfo = null; + + if (msg.getTransactionID() > 0) { + txInfo = transactions.get(msg.getTransactionID()); + } + + Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction(); + + if (logger.isTraceEnabled()) { + if (logger.isTraceEnabled()) { + logger.trace("lookup on {}, tx={}, preparedTX={}", msg.getTransactionID(), txInfo, preparedTX); + } + } + for (long queueID : routedQueues) { boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber()); - boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID()); + // if the pageTransaction is in prepare state, we have to increment the counter after the commit + // notice that there is a check if the commit is done in afterCommit + if (preparedTX != null) { + PageSubscription subscription = pgStore.getCursorProvider().getSubscription(queueID); + preparedTX.addOperation(new TransactionOperationAbstract() { + @Override + public void afterCommit(Transaction tx) { + // We use the pagingManager executor here, in case the commit happened while the rebuild manager is working + // on that case the increment will wait any pending tasks on that executor to finish before this executor takes effect + pagingManager.execute(() -> { + try { + subscription.getCounter().increment(null, 1, msg.getStoredSize()); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + }); + } + }); - if (!txOK) { - logger.debug("TX is not ok for {}", msg); - } - - if (ok && txOK) { // not acked and TX is ok - if (logger.isTraceEnabled()) { - logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID); - } - CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID); - if (copiedSubscription != null) { - copiedSubscription.empty = false; - copiedSubscription.addUp++; - copiedSubscription.sizeUp += msg.getPersistentSize(); - } } else { - if (logger.isTraceEnabled()) { - logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID); + boolean txOK = msg.getTransactionID() <= 0 || transactions == null || txInfo != null; + + if (!txOK) { + logger.debug("TX is not ok for {}", msg); + } + + if (ok && txOK) { // not acked and TX is ok + if (logger.isTraceEnabled()) { + logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID); + } + CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID); + if (copiedSubscription != null) { + copiedSubscription.empty = false; + copiedSubscription.addUp++; + copiedSubscription.sizeUp += msg.getPersistentSize(); + } + } else { + if (logger.isTraceEnabled()) { + logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID); + } } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index a0ebc3a9c4..d136052e0f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -739,6 +739,13 @@ public final class PageSubscriptionImpl implements PageSubscription { public void reloadPreparedACK(final Transaction tx, final PagePosition position) { deliveredCount.incrementAndGet(); installTXCallback(tx, position); + + try { + counter.increment(tx, -1, -position.getPersistentSize()); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java index bd538ef028..3bcdcf4d4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java @@ -51,6 +51,8 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { private long transactionID; + private volatile Transaction preparedTX; + private volatile long recordID = -1; private volatile boolean committed = false; @@ -73,6 +75,10 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { public PageTransactionInfoImpl() { } + @Override + public Transaction getPreparedTransaction() { + return preparedTX; + } @Override public long getRecordID() { @@ -161,6 +167,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { } committed = true; lateDeliveries = null; + preparedTX = null; } @Override @@ -225,6 +232,12 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { this.committed = committed; } + @Override + public void reloadPrepared(final Transaction tx) { + this.preparedTX = tx; + this.committed = false; + } + @Override public boolean isRollback() { return rolledback; @@ -232,6 +245,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { @Override public synchronized void rollback() { + preparedTX = null; rolledback = true; committed = false; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 18d96168db..21e1e9e3f9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -44,7 +45,6 @@ import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.SizeAwareMetric; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; -import org.apache.activemq.artemis.utils.collections.LongHashSet; import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -577,12 +577,12 @@ public final class PagingManagerImpl implements PagingManager { @Override public Future rebuildCounters(Set storedLargeMessages) { - LongHashSet transactionsSet = new LongHashSet(); - transactions.forEach((txId, tx) -> { - transactionsSet.add(txId); - }); + Map transactionsSet = new LongObjectHashMap(); + // making a copy + transactions.forEach(transactionsSet::put); + transactionsSet.forEach((a, b) -> System.out.println(a + " = " + b)); stores.forEach((address, pgStore) -> { - PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet, storedLargeMessages); + PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages); logger.debug("Setting destination {} to rebuild counters", address); managerExecutor.execute(rebuildManager); }); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index b98e038d42..1d430d302f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1926,7 +1926,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); } } else { - pageTransactionInfo.setCommitted(false); + pageTransactionInfo.reloadPrepared(tx); tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e509f464b9..23183f482f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1748,9 +1748,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // counted on the pageSubscription as well long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount(); if (logger.isTraceEnabled()) { - logger.trace("Queue={}/{} returning getMessageCount returning {}. pendingMetrics.getMessageCount() = {}, getScheduledCount() = {}, pageSubscription.getMessageCount()={}, pageSubscription.getDeliveredCount()={}", - name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), - pageSubscription.getDeliveredCount()); + logger.trace("Queue={}/{} returning getMessageCount \n\treturning {}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, \n\tpageSubscription.getMessageCount()={}, \n\tpageSubscription.getCounter().getValue()={}, \n\tpageSubscription.getDeliveredCount()={}", + name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue(), + pageSubscription.getDeliveredCount()); } return returnValue; } else { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 536dbf1b42..5256155f41 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -482,9 +482,21 @@ public class TransactionImpl implements Transaction { @Override public synchronized void addOperation(final TransactionOperation operation) { - checkCreateOperations(); - - operations.add(operation); + // We do this check, because in the case of XA Transactions and paging, + // the commit could happen while the counters are being rebuilt. + // if the state is commited we should execute it right away. + // this is just to avoid a race. + switch (state) { + case COMMITTED: + operation.afterCommit(this); + return; + case ROLLEDBACK: + operation.afterRollback(this); + return; + default: + checkCreateOperations(); + operations.add(operation); + } } @Override diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index b4e032ce72..f6bae89f3a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -208,6 +208,113 @@ public class TransactionImplTest extends ActiveMQTestBase { } + @Test + public void testAlreadyCommitted() throws Exception { + TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10); + + final AtomicInteger commit = new AtomicInteger(0); + final AtomicInteger rollback = new AtomicInteger(0); + tx.commit(); + + // simulating a race, the addOperation happened after the commit + tx.addOperation(new TransactionOperation() { + @Override + public void beforePrepare(Transaction tx) throws Exception { + + } + + @Override + public void afterPrepare(Transaction tx) { + + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + + } + + @Override + public void afterCommit(Transaction tx) { + commit.incrementAndGet(); + } + + @Override + public void beforeRollback(Transaction tx) throws Exception { + + } + + @Override + public void afterRollback(Transaction tx) { + rollback.incrementAndGet(); + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + + @Override + public List getListOnConsumer(long consumerID) { + return null; + } + }); + Assert.assertEquals(1, commit.get()); + Assert.assertEquals(0, rollback.get()); + } + + @Test + public void testAlreadyRolledBack() throws Exception { + TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10); + + final AtomicInteger rollback = new AtomicInteger(0); + final AtomicInteger commit = new AtomicInteger(0); + tx.rollback(); + + tx.addOperation(new TransactionOperation() { + @Override + public void beforePrepare(Transaction tx) throws Exception { + + } + + @Override + public void afterPrepare(Transaction tx) { + + } + + @Override + public void beforeCommit(Transaction tx) throws Exception { + + } + + @Override + public void afterCommit(Transaction tx) { + commit.incrementAndGet(); + } + + @Override + public void beforeRollback(Transaction tx) throws Exception { + + } + + @Override + public void afterRollback(Transaction tx) { + rollback.incrementAndGet(); + } + + @Override + public List getRelatedMessageReferences() { + return null; + } + + @Override + public List getListOnConsumer(long consumerID) { + return null; + } + }); + Assert.assertEquals(0, commit.get()); + Assert.assertEquals(1, rollback.get()); + } + class FakeSM implements StorageManager { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PendingTXCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PendingTXCounterTest.java new file mode 100644 index 0000000000..60dbaad174 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PendingTXCounterTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImplTestAccessor; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PendingTXCounterTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String ADDRESS = "PendingTXCounterTest"; + + ActiveMQServer server; + + protected static final int PAGE_MAX = 10 * 1024; + + protected static final int PAGE_SIZE = 1 * 1024; + + + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + Configuration config = createDefaultConfig(0, true).setJournalSyncNonTransactional(false); + + config.setMessageExpiryScanPeriod(-1); + server = createServer(true, config, PAGE_SIZE, PAGE_MAX); + + server.getAddressSettingsRepository().clear(); + + AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setMaxReadPageBytes(-1).setMaxSizeMessages(0).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false); + + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + + + server.start(); + + + server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST)); + + } + + @Test + public void testPendingSendCoreCommit() throws Exception { + pendingSend("CORE", false, true); + } + + @Test + public void testPendingSendCoreCommitNoRestart() throws Exception { + pendingSend("CORE", false, false); + } + + + @Test + public void testPendingSendCoreRollback() throws Exception { + pendingSend("CORE", true, true); + } + + @Test + public void testPendingSendCoreRollbackNoRestart() throws Exception { + pendingSend("CORE", false, false); + } + + private void pendingSend(String protocol, boolean rollback, boolean restart) throws Exception { + AssertionLoggerHandler.startCapture(); + runAfter(AssertionLoggerHandler::stopCapture); + + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS); + + + final int INITIAL_NUMBER_OF_MESSAGES = 10; + + final int EXTRA_SEND = 20; + + final int TOTAL_MESSAGES = rollback ? INITIAL_NUMBER_OF_MESSAGES : INITIAL_NUMBER_OF_MESSAGES + EXTRA_SEND; + + ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Queue queue = session.createQueue(ADDRESS); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < INITIAL_NUMBER_OF_MESSAGES; i++) { + Message message = session.createTextMessage("hello " + i); + message.setIntProperty("i", i); + producer.send(message); + } + } + + Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222038"), 2000); + + Wait.assertEquals(INITIAL_NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000); + + Xid xid = newXID(); + + try (XAConnection connection = (XAConnection) ((XAConnectionFactory)cf).createXAConnection(); + XASession session = connection.createXASession()) { + Queue queue = session.createQueue(ADDRESS); + MessageProducer producer = session.createProducer(queue); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + + for (int i = INITIAL_NUMBER_OF_MESSAGES; i < INITIAL_NUMBER_OF_MESSAGES + EXTRA_SEND; i++) { + Message message = session.createTextMessage("hello " + i); + message.setIntProperty("i", i); + producer.send(message); + } + session.getXAResource().end(xid, XAResource.TMSUCCESS); + session.getXAResource().prepare(xid); + } + + Wait.assertEquals(INITIAL_NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000); + + if (restart) { + server.stop(); + + server.start(); + } + + serverQueue = server.locateQueue(ADDRESS); + + Wait.assertEquals(INITIAL_NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000); + + try (XAConnection connection = (XAConnection) ((XAConnectionFactory)cf).createXAConnection(); + XASession session = connection.createXASession()) { + if (rollback) { + session.getXAResource().rollback(xid); + } else { + session.getXAResource().commit(xid, false); + } + } + + Wait.assertEquals(TOTAL_MESSAGES, serverQueue::getMessageCount, 2000); + + try (Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Queue queue = session.createQueue(ADDRESS); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + for (int i = 0; i < TOTAL_MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("hello " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + } + + Assert.assertTrue(serverQueue.getMessageCount() >= 0); + } + + Wait.assertEquals(0, serverQueue::getMessageCount, 2000); + { + org.apache.activemq.artemis.core.server.Queue localRefQueue = serverQueue; + Wait.assertEquals(0L, () -> QueueImplTestAccessor.getQueueMemorySize(localRefQueue)); + } + + } + + @Test + public void testPendingACKTXRollbackCore() throws Exception { + pendingACKTXRollback("CORE", true, true); + } + + @Test + public void testPendingACKTXCommitCore() throws Exception { + pendingACKTXRollback("CORE", false, true); + } + + @Test + public void testPendingACKTXRollbackCoreNoRestart() throws Exception { + pendingACKTXRollback("CORE", true, false); + } + + @Test + public void testPendingACKTXCommitCoreNoRestart() throws Exception { + pendingACKTXRollback("CORE", false, false); + } + + private void pendingACKTXRollback(String protocol, boolean rollback, boolean restart) throws Exception { + AssertionLoggerHandler.startCapture(); + runAfter(AssertionLoggerHandler::stopCapture); + + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS); + + final int NUMBER_OF_MESSAGES = 15; + + ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Queue queue = session.createQueue(ADDRESS); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + Message message = session.createTextMessage("hello " + i); + message.setIntProperty("i", i); + producer.send(message); + } + } + + Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222038"), 2000); + + Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000); + + Xid xid1 = newXID(); + Xid xid2 = newXID(); + + for (int repeat = 0; repeat < 2; repeat++) { + + Xid xid = repeat == 0 ? xid1 : xid2; + + int startPosition = 5 * repeat; + int endPosition = startPosition + 5; + + try (XAConnection connection = (XAConnection) ((XAConnectionFactory) cf).createXAConnection(); XASession session = connection.createXASession()) { + Queue queue = session.createQueue(ADDRESS); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + session.getXAResource().start(xid, XAResource.TMNOFLAGS); + + for (int i = startPosition; i < endPosition; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertEquals("hello " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + } + + session.getXAResource().end(xid, XAResource.TMSUCCESS); + session.getXAResource().prepare(xid); + + if (repeat == 0) { + session.getXAResource().commit(xid, false); + } + } + } + + Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000); + + try (Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) { + Queue queue = session.createQueue(ADDRESS); + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 10; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + logger.info("Received {}", message.getText()); + Assert.assertEquals("hello " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + } + Assert.assertNull(consumer.receiveNoWait()); + session.rollback(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000); + + if (restart) { + server.stop(); + + server.start(); + } + + serverQueue = server.locateQueue(ADDRESS); + + Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000); + + logger.info("Before tx = {}", serverQueue.getMessageCount()); + + try (XAConnection connection = (XAConnection) ((XAConnectionFactory)cf).createXAConnection(); + XASession session = connection.createXASession()) { + if (rollback) { + session.getXAResource().rollback(xid2); + } else { + session.getXAResource().commit(xid2, false); + } + } + + if (rollback) { + Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000); + } else { + Wait.assertEquals(NUMBER_OF_MESSAGES - 10, serverQueue::getMessageCount, 2000); + } + + try (Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + Queue queue = session.createQueue(ADDRESS); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + int start = rollback ? 5 : 10; + + for (int i = start; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals("hello " + i, message.getText()); + Assert.assertEquals(i, message.getIntProperty("i")); + } + Assert.assertNull(consumer.receiveNoWait()); + + Assert.assertTrue(serverQueue.getMessageCount() >= 0); + } + + Wait.assertEquals(0, serverQueue::getMessageCount, 2000); + + { + org.apache.activemq.artemis.core.server.Queue localRefQueue = serverQueue; + Wait.assertEquals(0L, () -> QueueImplTestAccessor.getQueueMemorySize(localRefQueue), 2000); + } + + } + +} \ No newline at end of file