From e900fb41db113dc22df96402ae93da5aea4fd51a Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 27 Jun 2012 11:53:58 +0000 Subject: [PATCH] AMQ-3872 AMQ-3305 - deal with redelivery of acked messages pending transaction completion, getting dropped by subscriptions, invisible till broker restart git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1354449 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 6 +- .../activemq/broker/XARecoveryBrokerTest.java | 106 ++++++++++++++++++ 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 07c8196524..5b167f75d5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1607,6 +1607,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { @Override public void afterRollback() throws Exception { reference.setAcked(false); + wakeup(); } }); } @@ -1879,7 +1880,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { continue; } if (!fullConsumers.contains(s) && !s.isFull()) { - if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node)) { + if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) { // Dispatch it. s.add(node); target = s; @@ -1894,8 +1895,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } // make sure it gets dispatched again - if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && - (!node.isDropped() || s.getConsumerInfo().isBrowser())) { + if (!node.isDropped()) { interestCount++; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 9932097425..38b08c2be6 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.management.InstanceNotFoundException; @@ -745,6 +747,110 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); } + public void testQueuePersistentPreparedAcksAvailableAfterRollbackPrefetchOne() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + int numMessages = 1; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + final int messageCount = expectedMessageCount(numMessages, destination); + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + // use consumer per destination for the composite dest case + // bc the same composite dest is used for sending so there + // will be duplicate message ids in the mix which a single + // consumer (PrefetchSubscription) cannot handle in a tx + // atm. The matching is based on messageId rather than messageId + // and destination + Set consumerInfos = new HashSet(); + for (ActiveMQDestination dest : destinationList(destination)) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest); + consumerInfo.setPrefetchSize(numMessages); + consumerInfos.add(consumerInfo); + } + + for (ConsumerInfo info : consumerInfos) { + connection.send(info); + } + + Message message = null; + for (ConsumerInfo info : consumerInfos) { + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + connection.send(createAck(info, message, 1, MessageAck.DELIVERED_ACK_TYPE)); + } + MessageAck ack = createAck(info, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // reconnect + connection.send(connectionInfo.createRemoveCommand()); + connection = createConnection(); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse) connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + connection.send(sessionInfo); + + for (ConsumerInfo info : consumerInfos) { + connection.send(info); + } + + // no redelivery, exactly once semantics while prepared + message = receiveMessage(connection); + assertNull(message); + assertNoMessagesLeft(connection); + + // rollback so we get redelivery + connection.request(createRollbackTransaction(connectionInfo, txid)); + + LOG.info("new tx for redelivery"); + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (ConsumerInfo info : consumerInfos) { + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull("unexpected null on:" + i, message); + MessageAck ack = createAck(info, message, 1, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + } + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse) connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() { addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); }