ARTEMIS-4278 Incorrect Paging Counters with Prepared Transactions

This commit is contained in:
Clebert Suconic 2023-05-10 16:54:22 -04:00 committed by clebertsuconic
parent 5320bd03b3
commit bea39f6692
10 changed files with 577 additions and 33 deletions

View File

@ -31,6 +31,12 @@ public interface PageTransactionInfo extends EncodingSupport {
void setCommitted(boolean committed); void setCommitted(boolean committed);
void reloadPrepared(Transaction transaction);
/* When we reload a transaction,
* We may have to add the counters after commit. */
Transaction getPreparedTransaction();
void commit(); void commit();
void rollback(); void rollback();

View File

@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.LongObjectHashMap; import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage; import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
@ -27,13 +29,15 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.utils.collections.LinkedList; import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
@ -44,8 +48,9 @@ public class PageCounterRebuildManager implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final PagingStore pgStore; private final PagingStore pgStore;
private final PagingManager pagingManager;
private final StorageManager sm; private final StorageManager sm;
private final LongHashSet transactions; private final Map<Long, PageTransactionInfo> transactions;
private boolean paging; private boolean paging;
private long limitPageId; private long limitPageId;
private int limitMessageNr; private int limitMessageNr;
@ -53,9 +58,10 @@ public class PageCounterRebuildManager implements Runnable {
private final Set<Long> storedLargeMessages; private final Set<Long> storedLargeMessages;
public PageCounterRebuildManager(PagingStore store, LongHashSet transactions, Set<Long> storedLargeMessages) { public PageCounterRebuildManager(PagingManager pagingManager, PagingStore store, Map<Long, PageTransactionInfo> transactions, Set<Long> storedLargeMessages) {
// we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end
initialize(store); initialize(store);
this.pagingManager = pagingManager;
this.pgStore = store; this.pgStore = store;
this.sm = store.getStorageManager(); this.sm = store.getStorageManager();
this.transactions = transactions; this.transactions = transactions;
@ -241,28 +247,64 @@ public class PageCounterRebuildManager implements Runnable {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues); logger.trace("reading message for rebuild cursor on address={}, pg={}, messageNR={}, routedQueues={}, message={}, queueLIst={}", pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), routedQueues, msg, routedQueues);
} }
PageTransactionInfo txInfo = null;
if (msg.getTransactionID() > 0) {
txInfo = transactions.get(msg.getTransactionID());
}
Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction();
if (logger.isTraceEnabled()) {
if (logger.isTraceEnabled()) {
logger.trace("lookup on {}, tx={}, preparedTX={}", msg.getTransactionID(), txInfo, preparedTX);
}
}
for (long queueID : routedQueues) { for (long queueID : routedQueues) {
boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber()); boolean ok = !isACK(queueID, msg.getPageNumber(), msg.getMessageNumber());
boolean txOK = msg.getTransactionID() <= 0 || transactions == null || transactions.contains(msg.getTransactionID()); // if the pageTransaction is in prepare state, we have to increment the counter after the commit
// notice that there is a check if the commit is done in afterCommit
if (preparedTX != null) {
PageSubscription subscription = pgStore.getCursorProvider().getSubscription(queueID);
preparedTX.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
// We use the pagingManager executor here, in case the commit happened while the rebuild manager is working
// on that case the increment will wait any pending tasks on that executor to finish before this executor takes effect
pagingManager.execute(() -> {
try {
subscription.getCounter().increment(null, 1, msg.getStoredSize());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
}
});
if (!txOK) {
logger.debug("TX is not ok for {}", msg);
}
if (ok && txOK) { // not acked and TX is ok
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
if (copiedSubscription != null) {
copiedSubscription.empty = false;
copiedSubscription.addUp++;
copiedSubscription.sizeUp += msg.getPersistentSize();
}
} else { } else {
if (logger.isTraceEnabled()) { boolean txOK = msg.getTransactionID() <= 0 || transactions == null || txInfo != null;
logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
if (!txOK) {
logger.debug("TX is not ok for {}", msg);
}
if (ok && txOK) { // not acked and TX is ok
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} NOT acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
CopiedSubscription copiedSubscription = copiedSubscriptionMap.get(queueID);
if (copiedSubscription != null) {
copiedSubscription.empty = false;
copiedSubscription.addUp++;
copiedSubscription.sizeUp += msg.getPersistentSize();
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("Message pageNumber={}/{} IS acked on queue {}", msg.getPageNumber(), msg.getMessageNumber(), queueID);
}
} }
} }
} }

View File

@ -739,6 +739,13 @@ public final class PageSubscriptionImpl implements PageSubscription {
public void reloadPreparedACK(final Transaction tx, final PagePosition position) { public void reloadPreparedACK(final Transaction tx, final PagePosition position) {
deliveredCount.incrementAndGet(); deliveredCount.incrementAndGet();
installTXCallback(tx, position); installTXCallback(tx, position);
try {
counter.increment(tx, -1, -position.getPersistentSize());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
} }
@Override @Override

View File

@ -51,6 +51,8 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
private long transactionID; private long transactionID;
private volatile Transaction preparedTX;
private volatile long recordID = -1; private volatile long recordID = -1;
private volatile boolean committed = false; private volatile boolean committed = false;
@ -73,6 +75,10 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
public PageTransactionInfoImpl() { public PageTransactionInfoImpl() {
} }
@Override
public Transaction getPreparedTransaction() {
return preparedTX;
}
@Override @Override
public long getRecordID() { public long getRecordID() {
@ -161,6 +167,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
} }
committed = true; committed = true;
lateDeliveries = null; lateDeliveries = null;
preparedTX = null;
} }
@Override @Override
@ -225,6 +232,12 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
this.committed = committed; this.committed = committed;
} }
@Override
public void reloadPrepared(final Transaction tx) {
this.preparedTX = tx;
this.committed = false;
}
@Override @Override
public boolean isRollback() { public boolean isRollback() {
return rolledback; return rolledback;
@ -232,6 +245,7 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
@Override @Override
public synchronized void rollback() { public synchronized void rollback() {
preparedTX = null;
rolledback = true; rolledback = true;
committed = false; committed = false;

View File

@ -29,6 +29,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
@ -44,7 +45,6 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SizeAwareMetric; import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -577,12 +577,12 @@ public final class PagingManagerImpl implements PagingManager {
@Override @Override
public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) { public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
LongHashSet transactionsSet = new LongHashSet(); Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap();
transactions.forEach((txId, tx) -> { // making a copy
transactionsSet.add(txId); transactions.forEach(transactionsSet::put);
}); transactionsSet.forEach((a, b) -> System.out.println(a + " = " + b));
stores.forEach((address, pgStore) -> { stores.forEach((address, pgStore) -> {
PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(pgStore, transactionsSet, storedLargeMessages); PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages);
logger.debug("Setting destination {} to rebuild counters", address); logger.debug("Setting destination {} to rebuild counters", address);
managerExecutor.execute(rebuildManager); managerExecutor.execute(rebuildManager);
}); });

View File

@ -1926,7 +1926,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
} }
} else { } else {
pageTransactionInfo.setCommitted(false); pageTransactionInfo.reloadPrepared(tx);
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);

View File

@ -1748,9 +1748,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// counted on the pageSubscription as well // counted on the pageSubscription as well
long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount(); long returnValue = (long) pendingMetrics.getMessageCount() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Queue={}/{} returning getMessageCount returning {}. pendingMetrics.getMessageCount() = {}, getScheduledCount() = {}, pageSubscription.getMessageCount()={}, pageSubscription.getDeliveredCount()={}", logger.trace("Queue={}/{} returning getMessageCount \n\treturning {}. \n\tpendingMetrics.getMessageCount() = {}, \n\tgetScheduledCount() = {}, \n\tpageSubscription.getMessageCount()={}, \n\tpageSubscription.getCounter().getValue()={}, \n\tpageSubscription.getDeliveredCount()={}",
name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), name, id, returnValue, pendingMetrics.getMessageCount(), getScheduledCount(), pageSubscription.getMessageCount(), pageSubscription.getCounter().getValue(),
pageSubscription.getDeliveredCount()); pageSubscription.getDeliveredCount());
} }
return returnValue; return returnValue;
} else { } else {

View File

@ -482,9 +482,21 @@ public class TransactionImpl implements Transaction {
@Override @Override
public synchronized void addOperation(final TransactionOperation operation) { public synchronized void addOperation(final TransactionOperation operation) {
checkCreateOperations(); // We do this check, because in the case of XA Transactions and paging,
// the commit could happen while the counters are being rebuilt.
operations.add(operation); // if the state is commited we should execute it right away.
// this is just to avoid a race.
switch (state) {
case COMMITTED:
operation.afterCommit(this);
return;
case ROLLEDBACK:
operation.afterRollback(this);
return;
default:
checkCreateOperations();
operations.add(operation);
}
} }
@Override @Override

View File

@ -208,6 +208,113 @@ public class TransactionImplTest extends ActiveMQTestBase {
} }
@Test
public void testAlreadyCommitted() throws Exception {
TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10);
final AtomicInteger commit = new AtomicInteger(0);
final AtomicInteger rollback = new AtomicInteger(0);
tx.commit();
// simulating a race, the addOperation happened after the commit
tx.addOperation(new TransactionOperation() {
@Override
public void beforePrepare(Transaction tx) throws Exception {
}
@Override
public void afterPrepare(Transaction tx) {
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
}
@Override
public void afterCommit(Transaction tx) {
commit.incrementAndGet();
}
@Override
public void beforeRollback(Transaction tx) throws Exception {
}
@Override
public void afterRollback(Transaction tx) {
rollback.incrementAndGet();
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
@Override
public List<MessageReference> getListOnConsumer(long consumerID) {
return null;
}
});
Assert.assertEquals(1, commit.get());
Assert.assertEquals(0, rollback.get());
}
@Test
public void testAlreadyRolledBack() throws Exception {
TransactionImpl tx = new TransactionImpl(newXID(), new FakeSM(), 10);
final AtomicInteger rollback = new AtomicInteger(0);
final AtomicInteger commit = new AtomicInteger(0);
tx.rollback();
tx.addOperation(new TransactionOperation() {
@Override
public void beforePrepare(Transaction tx) throws Exception {
}
@Override
public void afterPrepare(Transaction tx) {
}
@Override
public void beforeCommit(Transaction tx) throws Exception {
}
@Override
public void afterCommit(Transaction tx) {
commit.incrementAndGet();
}
@Override
public void beforeRollback(Transaction tx) throws Exception {
}
@Override
public void afterRollback(Transaction tx) {
rollback.incrementAndGet();
}
@Override
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
@Override
public List<MessageReference> getListOnConsumer(long consumerID) {
return null;
}
});
Assert.assertEquals(0, commit.get());
Assert.assertEquals(1, rollback.get());
}
class FakeSM implements StorageManager { class FakeSM implements StorageManager {
@Override @Override

View File

@ -0,0 +1,356 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImplTestAccessor;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PendingTXCounterTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String ADDRESS = "PendingTXCounterTest";
ActiveMQServer server;
protected static final int PAGE_MAX = 10 * 1024;
protected static final int PAGE_SIZE = 1 * 1024;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
Configuration config = createDefaultConfig(0, true).setJournalSyncNonTransactional(false);
config.setMessageExpiryScanPeriod(-1);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.getAddressSettingsRepository().clear();
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setMaxReadPageBytes(-1).setMaxSizeMessages(0).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
}
@Test
public void testPendingSendCoreCommit() throws Exception {
pendingSend("CORE", false, true);
}
@Test
public void testPendingSendCoreCommitNoRestart() throws Exception {
pendingSend("CORE", false, false);
}
@Test
public void testPendingSendCoreRollback() throws Exception {
pendingSend("CORE", true, true);
}
@Test
public void testPendingSendCoreRollbackNoRestart() throws Exception {
pendingSend("CORE", false, false);
}
private void pendingSend(String protocol, boolean rollback, boolean restart) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS);
final int INITIAL_NUMBER_OF_MESSAGES = 10;
final int EXTRA_SEND = 20;
final int TOTAL_MESSAGES = rollback ? INITIAL_NUMBER_OF_MESSAGES : INITIAL_NUMBER_OF_MESSAGES + EXTRA_SEND;
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(ADDRESS);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < INITIAL_NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage("hello " + i);
message.setIntProperty("i", i);
producer.send(message);
}
}
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222038"), 2000);
Wait.assertEquals(INITIAL_NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000);
Xid xid = newXID();
try (XAConnection connection = (XAConnection) ((XAConnectionFactory)cf).createXAConnection();
XASession session = connection.createXASession()) {
Queue queue = session.createQueue(ADDRESS);
MessageProducer producer = session.createProducer(queue);
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
for (int i = INITIAL_NUMBER_OF_MESSAGES; i < INITIAL_NUMBER_OF_MESSAGES + EXTRA_SEND; i++) {
Message message = session.createTextMessage("hello " + i);
message.setIntProperty("i", i);
producer.send(message);
}
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().prepare(xid);
}
Wait.assertEquals(INITIAL_NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000);
if (restart) {
server.stop();
server.start();
}
serverQueue = server.locateQueue(ADDRESS);
Wait.assertEquals(INITIAL_NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000);
try (XAConnection connection = (XAConnection) ((XAConnectionFactory)cf).createXAConnection();
XASession session = connection.createXASession()) {
if (rollback) {
session.getXAResource().rollback(xid);
} else {
session.getXAResource().commit(xid, false);
}
}
Wait.assertEquals(TOTAL_MESSAGES, serverQueue::getMessageCount, 2000);
try (Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(ADDRESS);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < TOTAL_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}
Assert.assertTrue(serverQueue.getMessageCount() >= 0);
}
Wait.assertEquals(0, serverQueue::getMessageCount, 2000);
{
org.apache.activemq.artemis.core.server.Queue localRefQueue = serverQueue;
Wait.assertEquals(0L, () -> QueueImplTestAccessor.getQueueMemorySize(localRefQueue));
}
}
@Test
public void testPendingACKTXRollbackCore() throws Exception {
pendingACKTXRollback("CORE", true, true);
}
@Test
public void testPendingACKTXCommitCore() throws Exception {
pendingACKTXRollback("CORE", false, true);
}
@Test
public void testPendingACKTXRollbackCoreNoRestart() throws Exception {
pendingACKTXRollback("CORE", true, false);
}
@Test
public void testPendingACKTXCommitCoreNoRestart() throws Exception {
pendingACKTXRollback("CORE", false, false);
}
private void pendingACKTXRollback(String protocol, boolean rollback, boolean restart) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS);
final int NUMBER_OF_MESSAGES = 15;
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(ADDRESS);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage("hello " + i);
message.setIntProperty("i", i);
producer.send(message);
}
}
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222038"), 2000);
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000);
Xid xid1 = newXID();
Xid xid2 = newXID();
for (int repeat = 0; repeat < 2; repeat++) {
Xid xid = repeat == 0 ? xid1 : xid2;
int startPosition = 5 * repeat;
int endPosition = startPosition + 5;
try (XAConnection connection = (XAConnection) ((XAConnectionFactory) cf).createXAConnection(); XASession session = connection.createXASession()) {
Queue queue = session.createQueue(ADDRESS);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
for (int i = startPosition; i < endPosition; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().prepare(xid);
if (repeat == 0) {
session.getXAResource().commit(xid, false);
}
}
}
Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000);
try (Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
Queue queue = session.createQueue(ADDRESS);
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 10; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
logger.info("Received {}", message.getText());
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}
Assert.assertNull(consumer.receiveNoWait());
session.rollback();
}
Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000);
if (restart) {
server.stop();
server.start();
}
serverQueue = server.locateQueue(ADDRESS);
Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000);
logger.info("Before tx = {}", serverQueue.getMessageCount());
try (XAConnection connection = (XAConnection) ((XAConnectionFactory)cf).createXAConnection();
XASession session = connection.createXASession()) {
if (rollback) {
session.getXAResource().rollback(xid2);
} else {
session.getXAResource().commit(xid2, false);
}
}
if (rollback) {
Wait.assertEquals(NUMBER_OF_MESSAGES - 5, serverQueue::getMessageCount, 2000);
} else {
Wait.assertEquals(NUMBER_OF_MESSAGES - 10, serverQueue::getMessageCount, 2000);
}
try (Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(ADDRESS);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
int start = rollback ? 5 : 10;
for (int i = start; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}
Assert.assertNull(consumer.receiveNoWait());
Assert.assertTrue(serverQueue.getMessageCount() >= 0);
}
Wait.assertEquals(0, serverQueue::getMessageCount, 2000);
{
org.apache.activemq.artemis.core.server.Queue localRefQueue = serverQueue;
Wait.assertEquals(0L, () -> QueueImplTestAccessor.getQueueMemorySize(localRefQueue), 2000);
}
}
}