diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index b3c8873d0f..8ccb8aa17a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -16,6 +16,28 @@ */ package org.apache.activemq; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.TransactionRolledBackException; + import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.activemq.command.ActiveMQDestination; @@ -29,6 +51,7 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.management.JMSConsumerStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; @@ -40,27 +63,6 @@ import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.TransactionRolledBackException; /** * A client uses a MessageConsumer object to receive messages @@ -100,6 +102,14 @@ import javax.jms.TransactionRolledBackException; */ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { + @SuppressWarnings("serial") + class PreviouslyDeliveredMap extends HashMap { + final TransactionId transactionId; + public PreviouslyDeliveredMap(TransactionId transactionId) { + this.transactionId = transactionId; + } + } + private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class); protected static final Scheduler scheduler = Scheduler.getInstance(); protected final ActiveMQSession session; @@ -113,7 +123,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // Always walk list in reverse order. private final LinkedList deliveredMessages = new LinkedList(); // track duplicate deliveries in a transaction such that the tx integrity can be validated - private HashMap previouslyDeliveredMessages; + private PreviouslyDeliveredMap previouslyDeliveredMessages; private int deliveredCounter; private int additionalWindowSize; private long redeliveryDelay; @@ -143,7 +153,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private long optimizeAckTimestamp = System.currentTimeMillis(); private long optimizeAckTimeout = 300; private long failoverRedeliveryWaitPeriod = 0; - private boolean rollbackInitiated; /** * Create a MessageConsumer @@ -558,8 +567,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC MessageDispatch md; if (info.getPrefetchSize() == 0) { - md = dequeue(-1); // We let the broker let us know when we - // timeout. + md = dequeue(-1); // We let the broker let us know when we timeout. } else { md = dequeue(timeout); } @@ -992,7 +1000,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } if (numberNotReplayed > 0) { - LOG.info("waiting for redelivery of " + numberNotReplayed + " to consumer :" + this.getConsumerId()); + LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: " + + previouslyDeliveredMessages.transactionId + ", to consumer :" + this.getConsumerId()); try { Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4)); } catch (InterruptedException outOfhere) { @@ -1008,12 +1017,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC */ private void rollbackOnFailedRecoveryRedelivery() throws JMSException { if (previouslyDeliveredMessages != null) { - if (rollbackInitiated) { - // second call from rollback, nothing more to do - // REVISIT - should beforeEnd be called again by transaction context? - rollbackInitiated = false; - return; - } // if any previously delivered messages was not re-delivered, transaction is invalid and must rollback // as messages have been dispatched else where. int numberNotReplayed = 0; @@ -1021,20 +1024,20 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (!entry.getValue()) { numberNotReplayed++; if (LOG.isDebugEnabled()) { - LOG.debug("previously delivered message has not been replayed in transaction, id: " + entry.getKey()); + LOG.debug("previously delivered message has not been replayed in transaction: " + + previouslyDeliveredMessages.transactionId + + " , messageId: " + entry.getKey()); } } } - if (numberNotReplayed > 0) { - String message = "rolling back transaction post failover recovery. " + numberNotReplayed + String message = "rolling back transaction (" + + previouslyDeliveredMessages.transactionId + ") post failover recovery. " + numberNotReplayed + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId(); LOG.warn(message); - rollbackInitiated = true; - throw new TransactionRolledBackException(message); + throw new TransactionRolledBackException(message); } } - } void acknowledge(MessageDispatch md) throws JMSException { @@ -1157,7 +1160,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC removeFromDeliveredMessages(entry.getKey()); } } - rollbackInitiated = false; clearPreviouslyDelivered(); } } @@ -1166,9 +1168,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * called with deliveredMessages locked */ private void removeFromDeliveredMessages(MessageId key) { - ListIterator iterator = deliveredMessages.listIterator(deliveredMessages.size()); - while (iterator.hasPrevious()) { - MessageDispatch candidate = iterator.previous(); + Iterator iterator = deliveredMessages.iterator(); + while (iterator.hasNext()) { + MessageDispatch candidate = iterator.next(); if (key.equals(candidate.getMessage().getMessageId())) { session.connection.rollbackDuplicate(this, candidate.getMessage()); iterator.remove(); @@ -1234,13 +1236,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (previouslyDeliveredMessages != null) { previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true); } else { - // existing transaction gone but still a duplicate!, lets mark as poison ftm, - // possibly could allow redelivery.. + // delivery while pending redelivery to another consumer on the same connection + // not waiting for redelivery will help here needsPoisonAck = true; } } if (needsPoisonAck) { - LOG.warn("acking as poison, duplicate transacted delivery but no recovering transaction for: " + md); + LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another" + + " consumer on this connection, failoverRedeliveryWaitPeriod=" + + failoverRedeliveryWaitPeriod + ". Message: " + md); MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); poisonAck.setFirstMessageId(md.getMessage().getMessageId()); session.sendAck(poisonAck); @@ -1271,7 +1275,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC LOG.debug(getConsumerId() + " tracking existing transacted delivered list (" + deliveredMessages.size() + ") on transport interrupt"); } if (previouslyDeliveredMessages == null) { - previouslyDeliveredMessages = new HashMap(); + previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.getTransactionContext().getTransactionId()); } for (MessageDispatch delivered : deliveredMessages) { previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false); diff --git a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java index 6409c205f1..6319974ca1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java @@ -79,6 +79,7 @@ public class TransactionContext implements XAResource { private Xid associatedXid; private TransactionId transactionId; private LocalTransactionEventListener localTransactionEventListener; + private int beforeEndIndex; public TransactionContext(ActiveMQConnection connection) { this.connection = connection; @@ -174,8 +175,8 @@ public class TransactionContext implements XAResource { int size = synchronizations.size(); try { - for (int i = 0; i < size; i++) { - synchronizations.get(i).beforeEnd(); + for (;beforeEndIndex < size;) { + synchronizations.get(beforeEndIndex++).beforeEnd(); } } catch (JMSException e) { throw e; @@ -206,6 +207,7 @@ public class TransactionContext implements XAResource { if (transactionId == null) { synchronizations = null; + beforeEndIndex = 0; this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId()); TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); this.connection.ensureConnectionInfoSent(); @@ -341,6 +343,7 @@ public class TransactionContext implements XAResource { // associate synchronizations = null; + beforeEndIndex = 0; setXid(xid); } diff --git a/activemq-core/src/test/java/org/apache/activemq/TransactionContextTest.java b/activemq-core/src/test/java/org/apache/activemq/TransactionContextTest.java new file mode 100644 index 0000000000..5e45d522e8 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/TransactionContextTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.TransactionRolledBackException; + +import org.apache.activemq.transaction.Synchronization; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TransactionContextTest { + + TransactionContext underTest; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnection connection; + + + @Before + public void setup() throws Exception { + connection = factory.createActiveMQConnection(); + underTest = new TransactionContext(connection); + } + + @After + public void tearDown() throws Exception { + connection.close(); + } + + @Test + public void testSyncBeforeEndCalledOnceOnRollback() throws Exception { + final AtomicInteger beforeEndCountA = new AtomicInteger(0); + final AtomicInteger beforeEndCountB = new AtomicInteger(0); + final AtomicInteger rollbackCountA = new AtomicInteger(0); + final AtomicInteger rollbackCountB = new AtomicInteger(0); + underTest.addSynchronization(new Synchronization() { + @Override + public void beforeEnd() throws Exception { + if (beforeEndCountA.getAndIncrement() == 0) { + throw new TransactionRolledBackException("force rollback"); + } + } + + @Override + public void afterCommit() throws Exception { + fail("exepcted rollback exception"); + } + + @Override + public void afterRollback() throws Exception { + rollbackCountA.incrementAndGet(); + } + + }); + + underTest.addSynchronization(new Synchronization() { + @Override + public void beforeEnd() throws Exception { + beforeEndCountB.getAndIncrement(); + } + + @Override + public void afterCommit() throws Exception { + fail("exepcted rollback exception"); + } + + @Override + public void afterRollback() throws Exception { + rollbackCountB.incrementAndGet(); + } + + }); + + + try { + underTest.commit(); + fail("exepcted rollback exception"); + } catch (TransactionRolledBackException expected) { + } + + assertEquals("beforeEnd A called once", 1, beforeEndCountA.get()); + assertEquals("beforeEnd B called once", 1, beforeEndCountA.get()); + assertEquals("rollbackCount B 0", 1, rollbackCountB.get()); + assertEquals("rollbackCount A B", rollbackCountA.get(), rollbackCountB.get()); + } + + @Test + public void testSyncIndexCleared() throws Exception { + final AtomicInteger beforeEndCountA = new AtomicInteger(0); + final AtomicInteger rollbackCountA = new AtomicInteger(0); + Synchronization sync = new Synchronization() { + @Override + public void beforeEnd() throws Exception { + beforeEndCountA.getAndIncrement(); + } + @Override + public void afterCommit() throws Exception { + fail("exepcted rollback exception"); + } + @Override + public void afterRollback() throws Exception { + rollbackCountA.incrementAndGet(); + } + }; + + underTest.begin(); + underTest.addSynchronization(sync); + underTest.rollback(); + + assertEquals("beforeEnd", 1, beforeEndCountA.get()); + assertEquals("rollback", 1, rollbackCountA.get()); + + // do it again + underTest.begin(); + underTest.addSynchronization(sync); + underTest.rollback(); + + assertEquals("beforeEnd", 2, beforeEndCountA.get()); + assertEquals("rollback", 2, rollbackCountA.get()); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index cd472abd28..66083652fc 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -288,7 +288,7 @@ public class FailoverConsumerOutstandingCommitTest { Message msg = testConsumer.receive(5000); assertNotNull(msg); - // restart with out standing delivered message + // restart with outstanding delivered message broker.stop(); broker.waitUntilStopped(); broker = createBroker(false); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 122e8a0e17..d516fe7d4a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -16,9 +16,11 @@ */ package org.apache.activemq.transport.failover; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -291,8 +293,8 @@ public class FailoverTransactionTest { @Test public void testFailoverConsumerAckLost() throws Exception { - // as failure depends on hash order, do a few times - for (int i=0; i<4; i++) { + // as failure depends on hash order of state tracker recovery, do a few times + for (int i=0; i<3; i++) { try { doTestFailoverConsumerAckLost(i); } finally { @@ -482,14 +484,14 @@ public class FailoverTransactionTest { broker.stop(); broker = createBroker(false); - // use empty jdbc store so that default wait for redeliveries will timeout after failover + // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover setPersistenceAdapter(1); broker.start(); try { consumerSession.commit(); - } catch (JMSException expectedRolledback) { - assertTrue(expectedRolledback instanceof TransactionRolledBackException); + fail("expected transaciton rolledback ex"); + } catch (TransactionRolledBackException expected) { } broker.stop(); @@ -550,6 +552,65 @@ public class FailoverTransactionTest { connection.close(); } + + @Test + public void testPoisonOnDeliveryWhilePending() throws Exception { + LOG.info("testWaitForMissingRedeliveries()"); + broker = createBroker(true); + broker.start(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000"); + Connection connection = cf.createConnection(); + connection.start(); + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=0"); + final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumerSession.createConsumer(destination); + + produceMessage(producerSession, destination); + Message msg = consumer.receive(20000); + if (msg == null) { + AutoFailTestSupport.dumpAllThreads("missing-"); + } + assertNotNull("got message just produced", msg); + + broker.stop(); + broker = createBroker(false); + broker.start(); + + final CountDownLatch commitDone = new CountDownLatch(1); + + + // with prefetch=0, it will not get redelivered as there will not be another receive + // for this consumer. so it will block till it timeout with an exception + // will block pending re-deliveries + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("doing async commit..."); + try { + consumerSession.commit(); + } catch (JMSException ignored) { + commitDone.countDown(); + } + } + }); + + // pull the pending message to this consumer where it will be poison as it is a duplicate without a tx + MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1")); + assertNull("consumer2 not get a message while pending to 1", consumer2.receive(2000)); + + assertTrue("commit completed with ex", commitDone.await(15, TimeUnit.SECONDS)); + assertNull("consumer should not get rolledback and non redelivered message", consumer.receive(5000)); + + // message should be in dlq + MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ")); + TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000); + assertNotNull("found message in dlq", dlqMessage); + assertEquals("text matches", "Test message", dlqMessage.getText()); + consumerSession.commit(); + + connection.close(); + } + private void produceMessage(final Session producerSession, Queue destination) throws JMSException { MessageProducer producer = producerSession.createProducer(destination);