diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a256f8a628..aee663a3e8 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -121,7 +121,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // The are the messages that were delivered to the consumer but that have // not been acknowledged. It's kept in reverse order since we // Always walk list in reverse order. - private final LinkedList deliveredMessages = new LinkedList(); + protected final LinkedList deliveredMessages = new LinkedList(); // track duplicate deliveries in a transaction such that the tx integrity can be validated private PreviouslyDeliveredMap previouslyDeliveredMessages; private int deliveredCounter; @@ -143,7 +143,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private ExecutorService executorService; private MessageTransformer transformer; - private boolean clearDispatchList; + private boolean clearDeliveredList; AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0); private MessageAck pendingAck; @@ -704,7 +704,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC void inProgressClearRequired() { inProgressClearRequiredFlag.incrementAndGet(); // deal with delivered messages async to avoid lock contention with in progress acks - clearDispatchList = true; + clearDeliveredList = true; } void clearMessagesInProgress() { @@ -730,6 +730,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } + clearDeliveredList(); } void deliverAcks() { @@ -818,6 +819,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (!this.info.isBrowser()) { for (MessageDispatch old : list) { // ensure we don't filter this as a duplicate + if (LOG.isDebugEnabled()) { + LOG.debug("on close, rollback: " + old.getMessage().getMessageId()); + } session.connection.rollbackDuplicate(this, old.getMessage()); } } @@ -838,7 +842,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * broker to pull a message we are about to receive */ protected void sendPullCommand(long timeout) throws JMSException { - clearDispatchList(); + clearDeliveredList(); if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); @@ -1010,6 +1014,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) { + if (LOG.isDebugEnabled()) { + LOG.debug("ackLater: sending: " + pendingAck); + } session.sendAck(pendingAck); pendingAck=null; deliveredCounter = 0; @@ -1025,7 +1032,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC @Override public void beforeEnd() throws Exception { if (transactedIndividualAck) { - clearDispatchList(); + clearDeliveredList(); waitForRedeliveries(); synchronized(deliveredMessages) { rollbackOnFailedRecoveryRedelivery(); @@ -1058,7 +1065,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * @throws JMSException */ public void acknowledge() throws JMSException { - clearDispatchList(); + clearDeliveredList(); waitForRedeliveries(); synchronized(deliveredMessages) { // Acknowledge all messages so far. @@ -1162,7 +1169,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } public void rollback() throws JMSException { - clearDispatchList(); + clearDeliveredList(); synchronized (unconsumedMessages.getMutex()) { if (optimizeAcknowledge) { // remove messages read but not acked at the broker yet through @@ -1301,6 +1308,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (previouslyDeliveredMessages != null) { for (Entry entry: previouslyDeliveredMessages.entrySet()) { if (!entry.getValue()) { + if (LOG.isTraceEnabled()) { + LOG.trace("rollback non redelivered: " + entry.getKey()); + } removeFromDeliveredMessages(entry.getKey()); } } @@ -1338,7 +1348,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC MessageListener listener = this.messageListener.get(); try { clearMessagesInProgress(); - clearDispatchList(); + clearDeliveredList(); synchronized (unconsumedMessages.getMutex()) { if (!unconsumedMessages.isClosed()) { if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { @@ -1375,10 +1385,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } else { if (!session.isTransacted()) { - LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId() - + " to consumer: " + getConsumerId() + ", ignoring (auto acking) duplicate: " + md); - MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); - session.sendAck(ack); + LOG.warn("Duplicate non transacted dispatch to consumer: " + getConsumerId() + ", poison acking: " + md); + MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); + poisonAck.setFirstMessageId(md.getMessage().getMessageId()); + poisonAck.setPoisonCause(new Throwable("Duplicate non transacted delivery to " + getConsumerId())); + session.sendAck(poisonAck); } else { if (LOG.isDebugEnabled()) { LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage()); @@ -1423,22 +1434,35 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } // async (on next call) clear or track delivered as they may be flagged as duplicates if they arrive again - private void clearDispatchList() { - if (clearDispatchList) { + private void clearDeliveredList() { + if (clearDeliveredList) { synchronized (deliveredMessages) { - if (clearDispatchList) { + if (clearDeliveredList) { if (!deliveredMessages.isEmpty()) { if (session.isTransacted()) { - if (LOG.isDebugEnabled()) { - LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt"); - } + if (previouslyDeliveredMessages == null) { previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.getTransactionContext().getTransactionId()); } for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); } + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " tracking existing transacted " + previouslyDeliveredMessages.transactionId + + " delivered list (" + deliveredMessages.size() + ") on transport interrupt"); + } } else { + if (session.isClientAcknowledge()) { + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " rolling back delivered list (" + deliveredMessages.size() + ") on transport interrupt"); + } + // allow redelivery + if (!this.info.isBrowser()) { + for (MessageDispatch md: deliveredMessages) { + this.session.connection.rollbackDuplicate(this, md.getMessage()); + } + } + } if (LOG.isDebugEnabled()) { LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt"); } @@ -1446,7 +1470,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC pendingAck = null; } } - clearDispatchList = false; + clearDeliveredList = false; } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index a522e5a7be..b839131135 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -41,9 +41,6 @@ import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.util.LoggingBrokerPlugin; import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.leveldb.LevelDBStore; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.SystemUsage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +72,7 @@ public class AMQ2149Test extends AutoFailTestSupport { String brokerURL = DEFAULT_BROKER_URL; int numBrokerRestarts = 0; - final static int MAX_BROKER_RESTARTS = 3; + final static int MAX_BROKER_RESTARTS = 4; BrokerService broker; Vector exceptions = new Vector(); @@ -171,6 +168,7 @@ public class AMQ2149Test extends AutoFailTestSupport { } final int TRANSACITON_BATCH = 500; + boolean resumeOnNextOrPreviousIsOk = false; public void onMessage(Message message) { try { final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY); @@ -182,6 +180,16 @@ public class AMQ2149Test extends AutoFailTestSupport { session.commit(); } } + if (resumeOnNextOrPreviousIsOk) { + // after an indoubt commit we need to accept what we get (within reason) + if (seqNum != nextExpectedSeqNum) { + if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) { + nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + LOG.info("In doubt commit failed, getting replay at:" + nextExpectedSeqNum); + } + } + resumeOnNextOrPreviousIsOk = false; + } if (seqNum != nextExpectedSeqNum) { LOG.warn(dest + " received " + seqNum + " in msg: " + message.getJMSMessageID() @@ -196,8 +204,16 @@ public class AMQ2149Test extends AutoFailTestSupport { lastId = message.getJMSMessageID(); } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery) { LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery); - // batch will be replayed - nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion in doubt")) { + // in doubt - either commit command or reply missing + // don't know if we will get a replay + resumeOnNextOrPreviousIsOk = true; + } else { + resumeOnNextOrPreviousIsOk = false; + // batch will be replayed + nextExpectedSeqNum -= (TRANSACITON_BATCH -1); + } + } catch (Throwable e) { LOG.error(dest + " onMessage error", e); exceptions.add(e); @@ -327,7 +343,7 @@ public class AMQ2149Test extends AutoFailTestSupport { public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception { numtoSend = 10000; sleepBetweenSend = 3; - brokerStopPeriod = 30 * 1000; + brokerStopPeriod = 10 * 1000; createBroker(new Configurer() { public void configure(BrokerService broker) throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index efd55a8321..9115c15b2b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -289,11 +289,14 @@ public class FailoverConsumerOutstandingCommitTest { assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS)); assertTrue("commit failed", gotCommitException.get()); assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS)); - assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(0).getText()); - // it was redelivered - assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText()); + int receivedIndex = 0; + assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(receivedIndex++).getText()); + if (!doActualBrokerCommit) { + // it will be redelivered and not tracked as a duplicate + assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(receivedIndex++).getText()); + } assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS)); - assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText()); + assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(receivedIndex++).getText()); connection.close(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java index 0bd3ee51b9..a6e07d487f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java @@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -95,6 +97,141 @@ public class FailoverConsumerUnconsumedTest { doTestFailoverConsumerDups(false); } + @SuppressWarnings("unchecked") + @Test + public void testFailoverClientAckMissingRedelivery() throws Exception { + + final int maxConsumers = 2; + broker = createBroker(true); + + broker.setPlugins(new BrokerPlugin[] { + new BrokerPluginSupport() { + int consumerCount; + + // broker is killed on x create consumer + @Override + public Subscription addConsumer(ConnectionContext context, + final ConsumerInfo info) throws Exception { + if (++consumerCount == maxConsumers) { + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker on consumer: " + info.getConsumerId()); + try { + broker.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + return super.addConsumer(context, info); + } + } + }); + broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + cf.setWatchTopicAdvisories(false); + + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + + final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch); + + final Vector testConsumers = new Vector(); + TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection); + testConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + LOG.info("onMessage:" + message.getJMSMessageID()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + testConsumers.add(testConsumer); + + + produceMessage(consumerSession, destination, maxConsumers * prefetch); + + assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + int totalDelivered = 0; + for (TestConsumer testConsumer : testConsumers) { + long delivered = testConsumer.deliveredSize(); + LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered); + totalDelivered += delivered; + } + return totalDelivered == maxConsumers * prefetch; + } + })); + + final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1); + + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + try { + LOG.info("add last consumer..."); + TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection); + testConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + LOG.info("onMessage:" + message.getJMSMessageID()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + testConsumers.add(testConsumer); + shutdownConsumerAdded.countDown(); + LOG.info("done add last consumer"); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + // will be stopped by the plugin + broker.waitUntilStopped(); + + broker = createBroker(false, this.url); + broker.start(); + + assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS)); + + // each should again get prefetch messages - all unacked deliveries should be rolledback + assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + int totalDelivered = 0; + for (TestConsumer testConsumer : testConsumers) { + long delivered = testConsumer.deliveredSize(); + LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered); + totalDelivered += delivered; + } + return totalDelivered == maxConsumers * prefetch; + } + })); + + assertTrue("after restart each got prefetch amount", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + for (TestConsumer testConsumer : testConsumers) { + long delivered = testConsumer.deliveredSize(); + LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered); + if (delivered != prefetch) { + return false; + } + } + return true; + } + })); + + connection.close(); + } + @SuppressWarnings("unchecked") public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception { @@ -156,14 +293,14 @@ public class FailoverConsumerUnconsumedTest { } })); - final CountDownLatch commitDoneLatch = new CountDownLatch(1); + final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1); Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { try { LOG.info("add last consumer..."); testConsumers.add(new TestConsumer(consumerSession, destination, connection)); - commitDoneLatch.countDown(); + shutdownConsumerAdded.countDown(); LOG.info("done add last consumer"); } catch (Exception e) { e.printStackTrace(); @@ -190,7 +327,7 @@ public class FailoverConsumerUnconsumedTest { broker = createBroker(false, this.url); broker.start(); - assertTrue("consumer added through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS)); // each should again get prefetch messages - all unconsumed deliveries should be rolledback assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() { @@ -231,6 +368,10 @@ public class FailoverConsumerUnconsumedTest { public int unconsumedSize() { return unconsumedMessages.size(); } + + public int deliveredSize() { + return deliveredMessages.size(); + } } static long idGen = 100; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java index 9c27d53152..ceb48ecf45 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java @@ -203,7 +203,7 @@ public class FailoverDuplicateTest extends TestSupport { receiveConnection.close(); // verify stats - assertEquals("expect all messages are dequeued with one duplicate", totalSent +1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + assertEquals("expect all messages are dequeued with one duplicate to dlq", totalSent + 2, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); Wait.waitFor(new Wait.Condition() { @Override @@ -212,7 +212,7 @@ public class FailoverDuplicateTest extends TestSupport { return totalSent + 1 <= ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount(); } }); - assertEquals("dequeue correct, including duplicate dispatch auto acked", totalSent + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); + assertEquals("dequeue correct, including duplicate dispatch poisoned", totalSent + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); // ensure no dangling messages with fresh broker etc broker.stop();