From 2ec32b67af3f494d65a076b32f71560168bb6ec9 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 5 Jul 2016 19:32:17 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5426 Fixing a race condition in ActiveMQMessageConsumer that could cause a NPE when the consumer is closing Thanks to Michael Wong for providing the test case for this issue. (cherry picked from commit 6bfa13b6e707fb3465a9193cd44c478514fcd948) --- .../activemq/ActiveMQMessageConsumer.java | 42 ++-- .../org/apache/activemq/bugs/AMQ5426Test.java | 227 ++++++++++++++++++ 2 files changed, 251 insertions(+), 18 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 90ca7c2a24..f2a78da0d1 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -147,7 +147,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private boolean clearDeliveredList; AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0); - private MessageAck pendingAck; + private volatile MessageAck pendingAck; private long lastDeliveredSequenceId = -1; private IOException failureError; @@ -780,6 +780,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC void deliverAcks() { MessageAck ack = null; if (deliveryingAcknowledgements.compareAndSet(false, true)) { + //Capture the pendingAck reference in case the optimizeAcknowledge dispatch + //thread mutates it + final MessageAck oldPendingAck = pendingAck; if (isAutoAcknowledgeEach()) { synchronized(deliveredMessages) { ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); @@ -787,12 +790,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC deliveredMessages.clear(); ackCounter = 0; } else { - ack = pendingAck; + ack = oldPendingAck; pendingAck = null; } } - } else if (pendingAck != null && pendingAck.isStandardAck()) { - ack = pendingAck; + } else if (oldPendingAck != null && oldPendingAck.isStandardAck()) { + ack = oldPendingAck; pendingAck = null; } if (ack != null) { @@ -971,8 +974,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() - if (pendingAck != null && deliveredCounter > 0) { - session.sendAck(pendingAck); + final MessageAck oldPendingAck = pendingAck; + if (oldPendingAck != null && deliveredCounter > 0) { + session.sendAck(oldPendingAck); pendingAck = null; deliveredCounter = 0; } @@ -1035,29 +1039,31 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC deliveredCounter++; - MessageAck oldPendingAck = pendingAck; - pendingAck = new MessageAck(md, ackType, deliveredCounter); - pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); - if( oldPendingAck==null ) { - pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); - } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { - pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); + final MessageAck oldPendingAck = pendingAck; + final MessageAck newPendingAck = new MessageAck(md, ackType, deliveredCounter); + newPendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); + if (oldPendingAck == null) { + newPendingAck.setFirstMessageId(newPendingAck.getLastMessageId()); + } else if (oldPendingAck.getAckType() == newPendingAck.getAckType()) { + newPendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); } else { // old pending ack being superseded by ack of another type, if is is not a delivered // ack and hence important, send it now so it is not lost. if (!oldPendingAck.isDeliveredAck()) { - LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck); + LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, newPendingAck); session.sendAck(oldPendingAck); } else { - LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck); + LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, newPendingAck); } } + pendingAck = newPendingAck; + // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) { - LOG.debug("ackLater: sending: {}", pendingAck); - session.sendAck(pendingAck); - pendingAck=null; + LOG.debug("ackLater: sending: {}", newPendingAck); + session.sendAck(newPendingAck); + pendingAck = null; deliveredCounter = 0; additionalWindowSize = 0; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java new file mode 100644 index 0000000000..da09aafafd --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5426Test.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertFalse; + +import java.io.InterruptedIOException; +import java.net.URI; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.log4j.Appender; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ5426Test { + + private static final Logger LOG = LoggerFactory + .getLogger(AMQ5426Test.class); + + private BrokerService brokerService; + private String connectionUri; + private AtomicBoolean hasFailureInProducer = new AtomicBoolean(false); + private Thread producerThread; + private AtomicBoolean hasErrorInLogger; + private Appender errorDetectorAppender; + + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory( + connectionUri); + conFactory.setWatchTopicAdvisories(false); + conFactory.setOptimizeAcknowledge(true); + return conFactory; + } + + @Before + public void setUp() throws Exception { + hasFailureInProducer = new AtomicBoolean(false); + hasErrorInLogger = new AtomicBoolean(false); + brokerService = BrokerFactory.createBroker(new URI( + "broker://()/localhost?persistent=false&useJmx=true")); + + PolicyEntry policy = new PolicyEntry(); + policy.setTopicPrefetch(100); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + connectionUri = brokerService.getTransportConnectorByScheme("tcp") + .getPublishableConnectString(); + + // Register an error listener to LOG4J + // The NPE will not be detectable as of V5.10 from + // ActiveMQConnection.setClientInternalExceptionListener + // since ActiveMQMessageConsumer.dispatch will silently catch and + // discard any RuntimeException + errorDetectorAppender = new AppenderSkeleton() { + @Override + public void close() { + // Do nothing + } + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + protected void append(LoggingEvent event) { + if (event.getLevel().isGreaterOrEqual(Level.ERROR)) + hasErrorInLogger.set(true); + } + }; + + org.apache.log4j.Logger.getRootLogger().addAppender(errorDetectorAppender); + producerThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Connection connection = createConnectionFactory() + .createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic destination = session.createTopic("test.AMQ5426"); + LOG.debug("Created topic: {}", destination); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.setTimeToLive(1000); + LOG.debug("Created producer: {}", producer); + + int i = 1; + while (!Thread.interrupted()) { + try { + TextMessage msg = session.createTextMessage(" testMessage " + i); + producer.send(msg); + try { + // Sleep for some nano seconds + Thread.sleep(0, 100); + } catch (InterruptedException e) { + // Restore the interrupt + Thread.currentThread().interrupt(); + } + LOG.debug("message sent: {}", i); + i++; + } catch (JMSException e) { + // Sometimes, we will gt a JMSException with nested + // InterruptedIOException when we interrupt the thread + if (!(e.getCause() != null && e.getCause() instanceof InterruptedIOException)) { + throw e; + } + } + } + + producer.close(); + session.close(); + connection.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + hasFailureInProducer.set(true); + } + } + }); + + producerThread.start(); + } + + @Test(timeout = 2 * 60 * 1000) + public void testConsumerProperlyClosedWithoutError() throws Exception { + Random rn = new Random(); + + final int NUMBER_OF_RUNS = 1000; + + for (int run = 0; run < NUMBER_OF_RUNS; run++) { + final AtomicInteger numberOfMessagesReceived = new AtomicInteger(0); + LOG.info("Starting run {} of {}", run, NUMBER_OF_RUNS); + + // Starts a consumer + Connection connection = createConnectionFactory().createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + Topic destination = session.createTopic("test.AMQ5426"); + + LOG.debug("Created topic: {}", destination); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + LOG.debug("Received message"); + numberOfMessagesReceived.getAndIncrement(); + } + }); + LOG.debug("Created consumer: {}", consumer); + + try { + // Sleep for a random time + Thread.sleep(rn.nextInt(5) + 1); + } catch (InterruptedException e) { + // Restore the interrupt + Thread.currentThread().interrupt(); + } + + // Close the consumer + LOG.debug("Closing consumer"); + consumer.close(); + session.close(); + connection.close(); + + assertFalse("Exception in Producer Thread", hasFailureInProducer.get()); + assertFalse("Error detected in Logger", hasErrorInLogger.get()); + LOG.info("Run {} of {} completed, message received: {}", run, + NUMBER_OF_RUNS, numberOfMessagesReceived.get()); + } + } + + @After + public void tearDown() throws Exception { + // Interrupt the producer thread + LOG.info("Shutdown producer thread"); + producerThread.interrupt(); + producerThread.join(); + brokerService.stop(); + brokerService.waitUntilStopped(); + + assertFalse("Exception in Producer Thread", hasFailureInProducer.get()); + assertFalse("Error detected in Logger", hasErrorInLogger.get()); + } +}