From 0bc545b5d161fc9feccd19e7fde6cd5d5ffac28f Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 8 Jan 2010 00:17:37 +0000 Subject: [PATCH] resolve potential lost ack with failover and an in progress consumer transaction that results in an Unmatched ack exception - resolve: https://issues.apache.org/activemq/browse/AMQ-2560 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@897061 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 53 ++-- .../broker/region/PrefetchSubscription.java | 11 +- .../jdbc/adapter/DefaultJDBCAdapter.java | 5 +- .../failover/FailoverTransactionTest.java | 291 ++++++++++++++++-- 4 files changed, 307 insertions(+), 53 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 883912a63a..e64d60baff 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -755,6 +755,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * broker to pull a message we are about to receive */ protected void sendPullCommand(long timeout) throws JMSException { + synchronized (unconsumedMessages.getMutex()) { + clearDispatchListOnReconnect(); + } if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); @@ -1067,25 +1070,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC MessageListener listener = this.messageListener.get(); try { synchronized (unconsumedMessages.getMutex()) { - if (clearDispatchList) { - // we are reconnecting so lets flush the in progress - // messages - clearDispatchList = false; - List list = unconsumedMessages.removeAll(); - if (!this.info.isBrowser()) { - for (MessageDispatch old : list) { - // ensure we don't filter this as a duplicate - session.connection.rollbackDuplicate(this, old.getMessage()); - } - } - if (!session.isTransacted()) { - // clean, so we don't have duplicates with optimizeAcknowledge - synchronized (deliveredMessages) { - deliveredMessages.clear(); - } - } - pendingAck = null; - } + clearDispatchListOnReconnect(); if (!unconsumedMessages.isClosed()) { if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { if (listener != null && unconsumedMessages.isRunning()) { @@ -1118,13 +1103,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (LOG.isDebugEnabled()) { LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage()); } - // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in. - // the normal ack will happen in the transaction. - if (session.isTransacted()) { - ackLater(md, MessageAck.DELIVERED_ACK_TYPE); - } else { - acknowledge(md); - } + acknowledge(md); } } } @@ -1137,6 +1116,28 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } + // called holding unconsumedMessages.getMutex() + private void clearDispatchListOnReconnect() { + if (clearDispatchList) { + // we are reconnecting so lets flush the in progress + // messages + clearDispatchList = false; + List list = unconsumedMessages.removeAll(); + if (!this.info.isBrowser()) { + for (MessageDispatch old : list) { + // ensure we don't filter this as a duplicate + session.connection.rollbackDuplicate(this, old.getMessage()); + } + } + + // clean, so we don't have duplicates with optimizeAcknowledge + synchronized (deliveredMessages) { + deliveredMessages.clear(); + } + pendingAck = null; + } + } + public int getMessageSize() { return unconsumedMessages.size(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 5f05528022..9559767214 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -437,15 +437,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } if (!checkFoundStart && firstAckedMsg != null) - throw new JMSException("Unmatched acknowledege: " + ack + throw new JMSException("Unmatched acknowledge: " + ack + "; Could not find Message-ID " + firstAckedMsg + " in dispatched-list (start of ack)"); if (!checkFoundEnd && lastAckedMsg != null) - throw new JMSException("Unmatched acknowledege: " + ack + throw new JMSException("Unmatched acknowledge: " + ack + "; Could not find Message-ID " + lastAckedMsg + " in dispatched-list (end of ack)"); if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { - throw new JMSException("Unmatched acknowledege: " + ack + throw new JMSException("Unmatched acknowledge: " + ack + "; Expected message count (" + ack.getMessageCount() + ") differs from count in dispatched-list (" + checkCount + ")"); @@ -663,9 +663,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription { node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); node.getRegionDestination().getDestinationStatistics().getInflight().increment(); if (LOG.isTraceEnabled()) { - LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId() - + ", dispatched: " + node.getRegionDestination().getDestinationStatistics().getDispatched().getCount() - + ", inflight: " + node.getRegionDestination().getDestinationStatistics().getInflight().getCount()); + LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size()); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 1493dde5c5..5a5d67e045 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -336,11 +336,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter { s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); s.setMaxRows(limit); rs = s.executeQuery(); - // jdbc scrollable cursor requires jdbc ver > 1.0 andis often implemented locally so avoid + // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid LinkedList reverseOrderIds = new LinkedList(); while (rs.next()) { reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); } + if (LOG.isDebugEnabled()) { + LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); + } for (MessageId id : reverseOrderIds) { listener.messageId(id); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index fe014b9709..35ef49ef6c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -16,17 +16,20 @@ */ package org.apache.activemq.transport.failover; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -39,6 +42,8 @@ import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; @@ -51,7 +56,7 @@ import org.junit.Test; public class FailoverTransactionTest { private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class); - private static final String QUEUE_NAME = "test.FailoverTransactionTest"; + private static final String QUEUE_NAME = "FailoverWithTx"; private String url = "tcp://localhost:61616"; BrokerService broker; @@ -79,7 +84,7 @@ public class FailoverTransactionTest { return broker; } - //@Test + @Test public void testFailoverProducerCloseBeforeTransaction() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); @@ -89,13 +94,7 @@ public class FailoverTransactionTest { Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - TextMessage message = session.createTextMessage("Test message"); - producer.send(message); - - // close producer before commit, emulate jmstemplate - producer.close(); + produceMessage(session, destination); // restart to force failover and connection state recovery before the commit broker.stop(); @@ -157,10 +156,7 @@ public class FailoverTransactionTest { Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - TextMessage message = session.createTextMessage("Test message"); - producer.send(message); + produceMessage(session, destination); final CountDownLatch commitDoneLatch = new CountDownLatch(1); // broker will die on commit reply so this will hang till restart @@ -243,13 +239,7 @@ public class FailoverTransactionTest { Queue destination = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - TextMessage message = session.createTextMessage("Test message"); - producer.send(message); - - // close producer before commit, emulate jmstemplate - producer.close(); + produceMessage(session, destination); // restart to force failover and connection state recovery before the commit broker.stop(); @@ -294,4 +284,265 @@ public class FailoverTransactionTest { session.commit(); connection.close(); } + + @Test + public void testFailoverConsumerCommitLost() throws Exception { + final int adapter = 0; + broker = createBroker(true); + setPersistenceAdapter(adapter); + + broker.setPlugins(new BrokerPlugin[] { + new BrokerPluginSupport() { + + @Override + public void commitTransaction(ConnectionContext context, + TransactionId xid, boolean onePhase) throws Exception { + super.commitTransaction(context, xid, onePhase); + // so commit will hang as if reply is lost + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker post commit..."); + try { + broker.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + }); + broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + Connection connection = cf.createConnection(); + connection.start(); + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue destination = producerSession.createQueue(QUEUE_NAME); + + final MessageConsumer consumer = consumerSession.createConsumer(destination); + + produceMessage(producerSession, destination); + + final Vector receivedMessages = new Vector(); + final CountDownLatch commitDoneLatch = new CountDownLatch(1); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("doing async commit after consume..."); + try { + Message msg = consumer.receive(20000); + LOG.info("Got message: " + msg); + receivedMessages.add(msg); + consumerSession.commit(); + commitDoneLatch.countDown(); + LOG.info("done async commit"); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + + // will be stopped by the plugin + broker.waitUntilStopped(); + broker = createBroker(false); + setPersistenceAdapter(adapter); + broker.start(); + + assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + + assertEquals("we got a message", 1, receivedMessages.size()); + + // new transaction + Message msg = consumer.receive(20000); + LOG.info("Received: " + msg); + assertNull("we did not get a duplicate message", msg); + consumerSession.commit(); + consumer.close(); + connection.close(); + + // ensure no dangling messages with fresh broker etc + broker.stop(); + broker.waitUntilStopped(); + + LOG.info("Checking for remaining/hung messages.."); + broker = createBroker(false); + setPersistenceAdapter(adapter); + broker.start(); + + // after restart, ensure no dangling messages + cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + connection = cf.createConnection(); + connection.start(); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(destination); + msg = consumer2.receive(1000); + if (msg == null) { + msg = consumer2.receive(5000); + } + LOG.info("Received: " + msg); + assertNull("no messges left dangling but got: " + msg, msg); + connection.close(); + } + + @Test + public void testFailoverConsumerAckLost() throws Exception { + // as failure depends on hash order, do a few times + for (int i=0; i<4; i++) { + try { + doTestFailoverConsumerAckLost(); + } finally { + stopBroker(); + } + } + } + + public void doTestFailoverConsumerAckLost() throws Exception { + final int adapter = 0; + broker = createBroker(true); + setPersistenceAdapter(adapter); + + broker.setPlugins(new BrokerPlugin[] { + new BrokerPluginSupport() { + + // broker is killed on delivered ack as prefetch is 1 + @Override + public void acknowledge( + ConsumerBrokerExchange consumerExchange, + final MessageAck ack) throws Exception { + + consumerExchange.getConnectionContext().setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker on ack: " + ack); + try { + broker.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + }); + broker.start(); + + Vector connections = new Vector(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + Connection connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); + + connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + final MessageConsumer consumer1 = consumerSession1.createConsumer(destination); + final MessageConsumer consumer2 = consumerSession2.createConsumer(destination); + + produceMessage(producerSession, destination); + produceMessage(producerSession, destination); + + final Vector receivedMessages = new Vector(); + final CountDownLatch commitDoneLatch = new CountDownLatch(1); + + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("doing async commit after consume..."); + try { + Message msg = consumer1.receive(20000); + LOG.info("consumer1 first attempt got message: " + msg); + receivedMessages.add(msg); + + TimeUnit.SECONDS.sleep(7); + + // should not get a second message as there are two messages and two consumers + // but with failover and unordered connection reinit it can get the second + // message which will have a problem for the ack + msg = consumer1.receive(5000); + LOG.info("consumer1 second attempt got message: " + msg); + if (msg != null) { + receivedMessages.add(msg); + } + + LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)"); + consumerSession1.commit(); + commitDoneLatch.countDown(); + LOG.info("done async commit"); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + + // will be stopped by the plugin + broker.waitUntilStopped(); + broker = createBroker(false); + setPersistenceAdapter(adapter); + broker.start(); + + assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + + // getting 2 is indicative of a problem - proven with dangling message found after restart + LOG.info("received message count: " + receivedMessages.size()); + + // new transaction + Message msg = consumer1.receive(2000); + LOG.info("post: from consumer1 received: " + msg); + assertNull("should be nothing left for consumer1", msg); + consumerSession1.commit(); + + // consumer2 should get other message + msg = consumer2.receive(5000); + LOG.info("post: from consumer2 received: " + msg); + assertNotNull("got message on consumer2", msg); + consumerSession2.commit(); + + for (Connection c: connections) { + c.close(); + } + + // ensure no dangling messages with fresh broker etc + broker.stop(); + broker.waitUntilStopped(); + + LOG.info("Checking for remaining/hung messages.."); + broker = createBroker(false); + setPersistenceAdapter(adapter); + broker.start(); + + // after restart, ensure no dangling messages + cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + connection = cf.createConnection(); + connection.start(); + Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer sweeper = sweeperSession.createConsumer(destination); + msg = sweeper.receive(1000); + if (msg == null) { + msg = sweeper.receive(5000); + } + LOG.info("Received: " + msg); + assertNull("no messges left dangling but got: " + msg, msg); + connection.close(); + } + + private void produceMessage(final Session producerSession, Queue destination) + throws JMSException { + MessageProducer producer = producerSession.createProducer(destination); + TextMessage message = producerSession.createTextMessage("Test message"); + producer.send(message); + producer.close(); + } + }