From 62daac4b31fded111cf6eb6fd33b212418ce85ea Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 1 Feb 2010 22:41:17 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2590 - commit may throw a TransactionRolledBackException in the event that after a failover recovery, the same messages are not redispatched - the transaction cannot be fully recreated so it must rollback git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@905432 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 112 ++++++++++++++++-- .../org/apache/activemq/ActiveMQSession.java | 4 +- .../apache/activemq/TransactionContext.java | 14 ++- ...FailoverConsumerOutstandingCommitTest.java | 57 +++++++++ .../failover/FailoverTransactionTest.java | 30 +++-- 5 files changed, 193 insertions(+), 24 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 2d6f85e6f0..7cab785055 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -46,7 +46,9 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -58,6 +60,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.TransactionRolledBackException; /** * A client uses a MessageConsumer object to receive messages @@ -109,6 +112,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // not been acknowledged. It's kept in reverse order since we // Always walk list in reverse order. private final LinkedList deliveredMessages = new LinkedList(); + private HashMap previouslyDeliveredMessages; private int deliveredCounter; private int additionalWindowSize; private long redeliveryDelay; @@ -146,7 +150,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * @param name * @param selector * @param prefetch - * @param maximumPendingMessageCount TODO + * @param maximumPendingMessageCount * @param noLocal * @param browser * @param dispatchAsync @@ -640,7 +644,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } void clearMessagesInProgress() { - // deal with delivered messages async to avoid lock contention with in pogress acks + // deal with delivered messages async to avoid lock contention with in progress acks clearDispatchList = true; synchronized (unconsumedMessages.getMutex()) { if (LOG.isDebugEnabled()) { @@ -951,6 +955,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return; // no msgs if (session.getTransacted()) { + rollbackOnFailedRecoveryRedelivery(); session.doStartTransaction(); ack.setTransactionId(session.getTransactionContext().getTransactionId()); } @@ -967,6 +972,51 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } + /* + * called with deliveredMessages locked + */ + private void rollbackOnFailedRecoveryRedelivery() throws JMSException { + if (previouslyDeliveredMessages != null) { + // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback + // as messages have been dispatched else where. + int numberNotReplayed = 0; + for (Entry entry: previouslyDeliveredMessages.entrySet()) { + if (!entry.getValue()) { + numberNotReplayed++; + // allow outstanding messages to get delivered again + removeFromDeliveredMessages(entry.getKey()); + if (LOG.isDebugEnabled()) { + LOG.debug("previously delivered message has not been replayed in transaction, id: " + entry.getKey()); + } + } + } + clearPreviouslyDelivered(); + + if (numberNotReplayed > 0) { + String message = "rolling back transaction post failover recovery. " + numberNotReplayed + + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); + LOG.warn(message); + throw new TransactionRolledBackException(message); + } + } + + } + + /* + * called with deliveredMessages locked + */ + private void removeFromDeliveredMessages(MessageId key) { + ListIterator iterator = deliveredMessages.listIterator(deliveredMessages.size()); + while (iterator.hasPrevious()) { + MessageDispatch candidate = iterator.previous(); + if (key.equals(candidate.getMessage().getMessageId())) { + session.connection.rollbackDuplicate(this, candidate.getMessage()); + iterator.remove(); + break; + } + } + } + void acknowledge(MessageDispatch md) throws JMSException { MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); session.sendAck(ack); @@ -978,6 +1028,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC public void commit() throws JMSException { synchronized (deliveredMessages) { deliveredMessages.clear(); + clearPreviouslyDelivered(); } redeliveryDelay = 0; } @@ -998,6 +1049,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } synchronized(deliveredMessages) { + clearPreviouslyDelivered(); if (deliveredMessages.isEmpty()) { return; } @@ -1073,6 +1125,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } + /* + * called with deliveredMessages locked + */ + private void clearPreviouslyDelivered() { + if (previouslyDeliveredMessages != null) { + previouslyDeliveredMessages.clear(); + previouslyDeliveredMessages = null; + } + } + public void dispatch(MessageDispatch md) { MessageListener listener = this.messageListener.get(); try { @@ -1106,11 +1168,23 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } else { - // ignore duplicate - if (LOG.isDebugEnabled()) { - LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage()); + if (!session.isTransacted()) { + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage()); + } + MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); + session.sendAck(ack); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage()); + } + synchronized (deliveredMessages) { + if (previouslyDeliveredMessages != null) { + previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); + } + } + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } - acknowledge(md); } } } @@ -1126,13 +1200,27 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // async (on next call) clear delivered as they will be auto-acked as duplicates if they arrive again private void clearDispatchList() { if (clearDispatchList) { - synchronized (deliveredMessages) { - if (LOG.isDebugEnabled()) { - LOG.debug(getConsumerId() + " async clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt"); - } + synchronized (deliveredMessages) { if (clearDispatchList) { - deliveredMessages.clear(); - pendingAck = null; + if (!deliveredMessages.isEmpty()) { + if (session.isTransacted()) { + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " tracking delivered list (" + deliveredMessages.size() + ") on transport interrupt"); + } + if (previouslyDeliveredMessages == null) { + previouslyDeliveredMessages = new HashMap(); + } + for (MessageDispatch delivered : deliveredMessages) { + previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt"); + } + deliveredMessages.clear(); + pendingAck = null; + } + } clearDispatchList = false; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 8df938551a..5f4d86bdc2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -516,7 +516,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public boolean getTransacted() throws JMSException { checkClosed(); - return (acknowledgementMode == Session.SESSION_TRANSACTED) || (transactionContext.isInXATransaction()); + return isTransacted(); } /** @@ -1784,7 +1784,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @return true - if the session uses transactions. */ public boolean isTransacted() { - return this.acknowledgementMode == Session.SESSION_TRANSACTED; + return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java index 97d3871ef7..221c90b137 100755 --- a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import javax.jms.JMSException; import javax.jms.TransactionInProgressException; +import javax.jms.TransactionRolledBackException; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; @@ -235,7 +236,11 @@ public class TransactionContext implements XAResource { throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); } - beforeEnd(); + try { + beforeEnd(); + } catch (TransactionRolledBackException canOcurrOnFailover) { + LOG.warn("rollback processing error", canOcurrOnFailover); + } if (transactionId != null) { if (LOG.isDebugEnabled()) { LOG.debug("Rollback: " + transactionId @@ -270,7 +275,12 @@ public class TransactionContext implements XAResource { throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); } - beforeEnd(); + try { + beforeEnd(); + } catch (JMSException e) { + rollback(); + throw e; + } // Only send commit if the transaction was started. if (transactionId != null) { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index afa8a20e37..169825e45a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.failover; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.concurrent.CountDownLatch; @@ -239,6 +240,62 @@ public class FailoverConsumerOutstandingCommitTest { connection.close(); } + @Test + public void testRollbackFailoverConsumerTx() throws Exception { + broker = createBroker(true); + broker.start(); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = producerSession.createQueue(QUEUE_NAME); + + final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + final MessageConsumer testConsumer = consumerSession.createConsumer(destination); + assertNull("no message yet", testConsumer.receiveNoWait()); + + produceMessage(producerSession, destination, 1); + producerSession.close(); + + // consume then rollback after restart + Message msg = testConsumer.receive(5000); + assertNotNull(msg); + + // restart with out standing delivered message + broker.stop(); + broker.waitUntilStopped(); + broker = createBroker(false); + broker.start(); + + consumerSession.rollback(); + + // receive again + msg = testConsumer.receive(10000); + assertNotNull("got message again after rollback", msg); + + consumerSession.commit(); + + // close before sweep + consumerSession.close(); + msg = receiveMessage(cf, destination); + assertNull("should be nothing left after commit", msg); + connection.close(); + } + + private Message receiveMessage(ActiveMQConnectionFactory cf, + Queue destination) throws Exception { + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + final MessageConsumer consumer = consumerSession.createConsumer(destination); + Message msg = consumer.receive(5000); + consumerSession.commit(); + connection.close(); + return msg; + } + private void produceMessage(final Session producerSession, Queue destination, long count) throws JMSException { MessageProducer producer = producerSession.createProducer(destination); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 13b3db3f55..a1417dc128 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -36,6 +36,7 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.TransactionRolledBackException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; @@ -468,7 +469,8 @@ public class FailoverTransactionTest { // should not get a second message as there are two messages and two consumers // but with failover and unordered connection restore it can get the second - // message which could create a problem for a pending ack + // message which could create a problem for a pending ack and also invalidate + // the transaction in which the first was consumed and acked msg = consumer1.receive(5000); LOG.info("consumer1 second attempt got message: " + msg); if (msg != null) { @@ -476,7 +478,17 @@ public class FailoverTransactionTest { } LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)"); - consumerSession1.commit(); + try { + consumerSession1.commit(); + } catch (JMSException expectedSometimes) { + LOG.info("got rollback ex on commit", expectedSometimes); + if (expectedSometimes instanceof TransactionRolledBackException && receivedMessages.size() == 2) { + // ok, message one was not replayed so we expect the rollback + } else { + throw expectedSometimes; + } + + } commitDoneLatch.countDown(); LOG.info("done async commit"); } catch (Exception e) { @@ -494,21 +506,23 @@ public class FailoverTransactionTest { assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); - // getting 2 is indicative of a problem - proven with dangling message found after restart + // getting 2 is indicative of orderiing issue. a problem if dangling message found after restart LOG.info("received message count: " + receivedMessages.size()); // new transaction Message msg = consumer1.receive(2000); LOG.info("post: from consumer1 received: " + msg); - assertNull("should be nothing left for consumer1", msg); + if (receivedMessages.size() == 1) { + assertNull("should be nothing left for consumer as recieve should have committed", msg); + } else { + assertNotNull("should be available again after commit rollback ex", msg); + } consumerSession1.commit(); - // consumer2 should get other message provided consumer1 did not get 2 + // consumer2 should get other message msg = consumer2.receive(5000); LOG.info("post: from consumer2 received: " + msg); - if (receivedMessages.size() == 1) { - assertNotNull("got second message on consumer2", msg); - } + assertNotNull("got second message on consumer2", msg); consumerSession2.commit(); for (Connection c: connections) {