From ed5edb03d7fe63ef27269566aaa9a9b501650eb0 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 26 Sep 2019 15:54:57 +0100 Subject: [PATCH] AMQ-7311 - track recovered prepared ack locations on a per subscriber basis, fix and test --- .../jdbc/JdbcMemoryTransactionStore.java | 2 +- .../jdbc/adapter/DefaultJDBCAdapter.java | 4 +- .../activemq/store/kahadb/KahaDBStore.java | 66 ++++-- .../store/kahadb/KahaDBTransactionStore.java | 5 + .../activemq/broker/XARecoveryBrokerTest.java | 93 +++++++- .../broker/mLevelDBXARecoveryBrokerTest.java | 2 + .../activemq/store/jdbc/XACompletionTest.java | 199 +++++++++++++++++- 7 files changed, 351 insertions(+), 20 deletions(-) diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index 4bbe43d387..ccf7485013 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -294,7 +294,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { @Override public void rollback(ConnectionContext context) throws IOException { - ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId); + ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, jdbcTopicMessageStore.isPrioritizedMessages() ? priority : 0, jdbcTopicMessageStore.getDestination(), subName, clientId); jdbcTopicMessageStore.complete(clientId, subName); } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 8d76fe683b..031d976e39 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -1008,8 +1008,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter { String encodedString = rs.getString(1); byte[] encodedXid = parseBase64Binary(encodedString); String destination = rs.getString(2); - String subName = rs.getString(3); - String subId = rs.getString(4); + String subId = rs.getString(3); + String subName = rs.getString(4); jdbcMemoryTransactionStore.recoverLastAck(encodedXid, ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE), subName, subId); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 47285ea4a5..a8af5aecd6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -419,8 +420,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, protected KahaDestination dest; private final int maxAsyncJobs; private final Semaphore localDestinationSemaphore; - protected final Set ackedAndPrepared = new HashSet<>(); - protected final Set rolledBackAcks = new HashSet<>(); + protected final HashMap> ackedAndPreparedMap = new HashMap>(); + protected final HashMap> rolledBackAcksMap = new HashMap>(); double doneTasks, canceledTasks = 0; @@ -437,6 +438,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } + private final String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) { + return destination.isQueue() ? destination.getPhysicalName() : ack.getConsumerId().getConnectionId(); + } + // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, // till then they are skipped by the store. // 'at most once' XA guarantee @@ -444,6 +449,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, indexLock.writeLock().lock(); try { for (MessageAck ack : acks) { + final String key = recoveredTxStateMapKey(destination, ack); + Set ackedAndPrepared = ackedAndPreparedMap.get(key); + if (ackedAndPrepared == null) { + ackedAndPrepared = new LinkedHashSet(); + ackedAndPreparedMap.put(key, ackedAndPrepared); + } ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); } } finally { @@ -457,8 +468,20 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, try { for (MessageAck ack : acks) { final String id = ack.getLastMessageId().toProducerKey(); - ackedAndPrepared.remove(id); + final String key = recoveredTxStateMapKey(destination, ack); + Set ackedAndPrepared = ackedAndPreparedMap.get(key); + if (ackedAndPrepared != null) { + ackedAndPrepared.remove(id); + if (ackedAndPreparedMap.isEmpty()) { + ackedAndPreparedMap.remove(key); + } + } if (rollback) { + Set rolledBackAcks = rolledBackAcksMap.get(key); + if (rolledBackAcks == null) { + rolledBackAcks = new LinkedHashSet(); + rolledBackAcksMap.put(key, rolledBackAcks); + } rolledBackAcks.add(id); pageFile.tx().execute(tx -> { incrementAndAddSizeToStoreStat(tx, dest, 0); @@ -646,12 +669,13 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, @Override public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); - recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); + recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener); sd.orderIndex.resetCursorPosition(); for (Iterator> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator .hasNext(); ) { Entry entry = iterator.next(); - if (ackedAndPrepared.contains(entry.getValue().messageId)) { + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } Message msg = loadMessage(entry.getValue().location); @@ -673,10 +697,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); Entry entry = null; - int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); + int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { entry = iterator.next(); - if (ackedAndPrepared.contains(entry.getValue().messageId)) { + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } Message msg = loadMessage(entry.getValue().location); @@ -695,9 +720,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } } - protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { + protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { int counter = 0; String id; + + Set rolledBackAcks = rolledBackAcksMap.get(recoveredTxStateMapKey); + if (rolledBackAcks == null) { + return counter; + } for (Iterator iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { id = iterator.next(); iterator.remove(); @@ -710,12 +740,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, break; } } else { - LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); + LOG.debug("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); } } else { LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); } } + if (rolledBackAcks.isEmpty()) { + rolledBackAcksMap.remove(recoveredTxStateMapKey); + } return counter; } @@ -830,7 +863,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, return statistics; } }); - recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size()); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + if (ackedAndPrepared != null) { + recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size()); + } getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); } finally { @@ -1113,11 +1149,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, subAckPositions = null; sd.orderIndex.setBatch(tx, cursorPos); } - recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); + recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); - if (ackedAndPrepared.contains(entry.getValue().messageId)) { + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } //If subAckPositions is set then verify the sequence set contains the message still @@ -1173,11 +1210,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, } Entry entry = null; - int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); + int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator .hasNext();) { entry = iterator.next(); - if (ackedAndPrepared.contains(entry.getValue().messageId)) { + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { continue; } //If subAckPositions is set then verify the sequence set contains the message still diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java index 8b66867c1a..b0f5c411d2 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; @@ -359,6 +360,10 @@ public class KahaDBTransactionStore implements TransactionStore { MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op; Buffer ackb = rmOp.getCommand().getAck(); MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput())); + // allow the ack to be tracked back to its durable sub + ConsumerId subKey = new ConsumerId(); + subKey.setConnectionId(rmOp.getCommand().getSubscriptionKey()); + ack.setConsumerId(subKey); ackList.add(ack); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 8415b93c6e..9e174c2ff0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -713,7 +713,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { } - public void x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() { + public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() { addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); } @@ -793,6 +793,97 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); } + public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs() throws Exception { + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("sub"); + connection.send(consumerInfo); + + ConsumerInfo consumerInfoX = createConsumerInfo(sessionInfo, destination); + consumerInfoX.setSubscriptionName("subX"); + connection.send(consumerInfoX); + connection.send(consumerInfoX.createRemoveCommand()); + + final int numMessages = 4; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + final int messageCount = expectedMessageCount(numMessages, destination); + Message m = null; + for (int i = 0; i < messageCount; i++) { + m = receiveMessage(connection); + assertNotNull("unexpected null on: " + i, m); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, m, messageCount, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("sub"); + connection.send(consumerInfo); + + // no redelivery, exactly once semantics unless there is rollback + m = receiveMessage(connection); + assertNull(m); + assertNoMessagesLeft(connection); + + // ensure subX can get it's copy of the messages + consumerInfoX = createConsumerInfo(sessionInfo, destination); + consumerInfoX.setSubscriptionName("subX"); + connection.send(consumerInfoX); + + for (int i = 0; i < messageCount; i++) { + m = receiveMessage(connection); + assertNotNull("unexpected null for subX on: " + i, m); + } + + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { ActiveMQDestination destination = createDestination(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java index 7adb983a72..eb8baddfae 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java @@ -74,4 +74,6 @@ public class mLevelDBXARecoveryBrokerTest extends XARecoveryBrokerTest { } public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception { } + public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs() throws Exception { + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java index a0c49cba24..e203f96b5f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java @@ -29,6 +29,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.filter.AnyDestination; import org.apache.activemq.filter.DestinationMap; @@ -407,6 +408,182 @@ public class XACompletionTest extends TestSupport { } + @Test + public void testConsumeAfterAckPreparedRolledbackTopic() throws Exception { + + factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); + factory.setWatchTopicAdvisories(false); + + final ActiveMQTopic destination = new ActiveMQTopic("TEST"); + + ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection(); + activeMQXAConnection.setClientID("durable"); + activeMQXAConnection.start(); + XASession xaSession = activeMQXAConnection.createXASession(); + + MessageConsumer consumer = xaSession.createDurableSubscriber(destination, "sub1"); + consumer.close(); + consumer = xaSession.createDurableSubscriber(destination, "sub2"); + + sendMessagesTo(10, destination); + + XAResource resource = xaSession.getXAResource(); + resource.recover(XAResource.TMSTARTRSCAN); + resource.recover(XAResource.TMNOFLAGS); + + dumpMessages(); + Xid tid = createXid(); + + resource.start(tid, XAResource.TMNOFLAGS); + + int messagesReceived = 0; + + for (int i = 0; i < 5; i++) { + + Message message = null; + try { + LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected); + message = consumer.receive(2000); + LOG.info("Received : " + message); + messagesReceived++; + } catch (Exception e) { + LOG.debug("Caught exception:", e); + } + } + + resource.end(tid, XAResource.TMSUCCESS); + resource.prepare(tid); + + consumer.close(); + activeMQXAConnection.close(); + + LOG.info("after close"); + + broker = restartBroker(); + + LOG.info("Try consume... after restart"); + dumpMessages(); + + factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); + factory.setWatchTopicAdvisories(false); + + activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection(); + activeMQXAConnection.start(); + xaSession = activeMQXAConnection.createXASession(); + + XAResource xaResource = xaSession.getXAResource(); + + Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN); + xaResource.recover(XAResource.TMNOFLAGS); + + LOG.info("Rollback outcome for ack"); + xaResource.rollback(xids[0]); + + assertTrue("got expected", consumeOnlyN(10,"durable", "sub1", destination)); + assertTrue("got expected", consumeOnlyN(10, "durable", "sub2", destination)); + } + + @Test + public void testConsumeAfterAckPreparedCommitTopic() throws Exception { + + factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); + factory.setWatchTopicAdvisories(false); + + final ActiveMQTopic destination = new ActiveMQTopic("TEST"); + + ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection(); + activeMQXAConnection.setClientID("durable"); + activeMQXAConnection.start(); + XASession xaSession = activeMQXAConnection.createXASession(); + + MessageConsumer consumer = xaSession.createDurableSubscriber(destination, "sub1"); + consumer.close(); + consumer = xaSession.createDurableSubscriber(destination, "sub2"); + + sendMessagesTo(10, destination); + + XAResource resource = xaSession.getXAResource(); + resource.recover(XAResource.TMSTARTRSCAN); + resource.recover(XAResource.TMNOFLAGS); + + dumpMessages(); + Xid tid = createXid(); + + resource.start(tid, XAResource.TMNOFLAGS); + + int messagesReceived = 0; + + for (int i = 0; i < 5; i++) { + + Message message = null; + try { + LOG.debug("Receiving message " + (messagesReceived + 1) + " of " + messagesExpected); + message = consumer.receive(2000); + LOG.info("Received : " + message); + messagesReceived++; + } catch (Exception e) { + LOG.debug("Caught exception:", e); + } + } + + resource.end(tid, XAResource.TMSUCCESS); + resource.prepare(tid); + + consumer.close(); + activeMQXAConnection.close(); + + LOG.info("after close"); + + broker = restartBroker(); + + LOG.info("Try consume... after restart"); + dumpMessages(); + + factory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); + factory.setWatchTopicAdvisories(false); + + activeMQXAConnection = (ActiveMQXAConnection) factory.createXAConnection(); + activeMQXAConnection.start(); + xaSession = activeMQXAConnection.createXASession(); + + XAResource xaResource = xaSession.getXAResource(); + + Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN); + xaResource.recover(XAResource.TMNOFLAGS); + + LOG.info("Rollback outcome for ack"); + xaResource.commit(xids[0], false); + + assertTrue("got expected", consumeOnlyN(10,"durable", "sub1", destination)); + assertTrue("got expected", consumeOnlyN(5, "durable", "sub2", destination)); + + LOG.info("at end..."); + dumpMessages(); + + } + + private boolean consumeOnlyN(int expected, String clientId, String subName, ActiveMQTopic destination) throws Exception { + int drained = 0; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + expected); + factory.setWatchTopicAdvisories(false); + javax.jms.Connection connection = factory.createConnection(); + connection.setClientID(clientId); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(destination, subName); + Message message = null; + while ( (message =consumer.receive(2000)) != null) { + drained++; + LOG.info("Sub:" + subName + ", received: " + message.getJMSMessageID()); + } + consumer.close(); + } finally { + connection.close(); + } + return drained == expected; + } + @Test public void testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws Exception { @@ -938,16 +1115,24 @@ public class XACompletionTest extends TestSupport { } protected void sendMessages(int messagesExpected) throws Exception { + sendMessagesTo(messagesExpected, new ActiveMQQueue("TEST")); + } + + protected void sendMessagesTo(int messagesExpected, Destination destination) throws Exception { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(connectionUri); activeMQConnectionFactory.setWatchTopicAdvisories(false); - sendMessagesWith(activeMQConnectionFactory, messagesExpected); + sendMessagesWithTo(activeMQConnectionFactory, messagesExpected, destination); } protected void sendMessagesWith(ConnectionFactory factory, int messagesExpected) throws Exception { + sendMessagesWithTo(factory, messagesExpected, new ActiveMQQueue("TEST")); + } + + protected void sendMessagesWithTo(ConnectionFactory factory, int messagesExpected, Destination destination) throws Exception { javax.jms.Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("TEST"); + MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); @@ -975,6 +1160,15 @@ public class XACompletionTest extends TestSupport { LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message); } statement.close(); + + statement = conn.prepareStatement("SELECT LAST_ACKED_ID, CLIENT_ID, SUB_NAME, PRIORITY, XID FROM ACTIVEMQ_ACKS"); + result = statement.executeQuery(); + LOG.info("Messages in ACKS table db..."); + while (result.next()) { + LOG.info("lastAcked: {}, clientId: {}, SUB_NAME: {}, PRIORITY: {}, XID {}", + result.getLong(1), result.getString(2), result.getString(3), result.getInt(4), result.getString(5)); + } + statement.close(); conn.close(); } @@ -1011,6 +1205,7 @@ public class XACompletionTest extends TestSupport { DestinationMap destinationMap = new DestinationMap(); GroupPrincipal anaGroup = new GroupPrincipal(id); destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">")}), anaGroup); + destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQTopic(">")}), anaGroup); map.setWriteACLs(destinationMap); map.setAdminACLs(destinationMap); map.setReadACLs(destinationMap);