From dc0291b290bc6d4a9d77eede1dd319ea7db8f903 Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 8 Oct 2013 21:28:30 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4791 - fix and test. removed the delay but left the warn if dispatch ocurrs before interruption processing is complete. Problem was a race between consumer close and sessions copy on write dispatchers list --- .../apache/activemq/ActiveMQConnection.java | 47 +++--- .../activemq/ActiveMQConnectionConsumer.java | 6 +- .../activemq/ActiveMQMessageConsumer.java | 6 +- .../org/apache/activemq/ActiveMQSession.java | 6 +- .../state/ConnectionStateTracker.java | 1 + ...FailoverConsumerOutstandingCommitTest.java | 2 + .../failover/FailoverTransactionTest.java | 149 +++++++++++++++++- 7 files changed, 180 insertions(+), 37 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index a622944924..50573234d9 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -198,7 +198,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private DestinationSource destinationSource; private final Object ensureConnectionInfoSentMutex = new Object(); private boolean useDedicatedTaskRunner; - protected volatile CountDownLatch transportInterruptionProcessingComplete; + protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0); private long consumerFailoverRedeliveryWaitPeriod; private Scheduler scheduler; private boolean messagePrioritySupported = true; @@ -2023,19 +2023,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon @Override public void transportInterupted() { - this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); - if (LOG.isDebugEnabled()) { - LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount()); - } - signalInterruptionProcessingNeeded(); - + transportInterruptionProcessingComplete.set(1); for (Iterator i = this.sessions.iterator(); i.hasNext();) { ActiveMQSession s = i.next(); - s.clearMessagesInProgress(); + s.clearMessagesInProgress(transportInterruptionProcessingComplete); } for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { - connectionConsumer.clearMessagesInProgress(); + connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete); + } + + if (transportInterruptionProcessingComplete.decrementAndGet() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get()); + } + signalInterruptionProcessingNeeded(); } for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { @@ -2462,33 +2464,23 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon } protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if (cdl != null) { - if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) { - LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete.."); - cdl.await(10, TimeUnit.SECONDS); - } + if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) { + LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get()); signalInterruptionProcessingComplete(); } } protected void transportInterruptionProcessingComplete() { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if (cdl != null) { - cdl.countDown(); - try { - signalInterruptionProcessingComplete(); - } catch (InterruptedException ignored) {} + if (transportInterruptionProcessingComplete.decrementAndGet() == 0) { + signalInterruptionProcessingComplete(); } } - private void signalInterruptionProcessingComplete() throws InterruptedException { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if (cdl.getCount()==0) { + private void signalInterruptionProcessingComplete() { if (LOG.isDebugEnabled()) { - LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId()); + LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get() + + " for:" + this.getConnectionInfo().getConnectionId()); } - this.transportInterruptionProcessingComplete = null; FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); if (failoverTransport != null) { @@ -2498,8 +2490,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId()); } } - - } + transportInterruptionProcessingComplete.set(0); } private void signalInterruptionProcessingNeeded() { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java index 7dc96a2ddf..ba4b9ddb84 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java @@ -20,6 +20,8 @@ package org.apache.activemq; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionConsumer; import javax.jms.IllegalStateException; @@ -155,11 +157,9 @@ public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQD return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }"; } - public void clearMessagesInProgress() { + public void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { // future: may want to deal with rollback of in progress messages to track re deliveries // before indicating that all is complete. - // Till there is a need, lets immediately allow dispatch - this.connection.transportInterruptionProcessingComplete(); } public ConsumerInfo getConsumerInfo() { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index baf5233d6b..39f55bf8f8 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -687,7 +687,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC void doClose() throws JMSException { // Store interrupted state and clear so that Transport operations don't - // throw InterruptedException and we ensure that resources are clened up. + // throw InterruptedException and we ensure that resources are cleaned up. boolean interrupted = Thread.interrupted(); dispose(); RemoveInfo removeCommand = info.createRemoveCommand(); @@ -1584,4 +1584,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } } } + + public boolean hasMessageListener() { + return messageListener.get() != null; + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 3348526f56..0a96134588 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -647,7 +648,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } } - void clearMessagesInProgress() { + void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { executor.clearMessagesInProgress(); // we are called from inside the transport reconnection logic // which involves us clearing all the connections' consumers @@ -659,6 +660,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta // for (final ActiveMQMessageConsumer consumer : consumers) { consumer.inProgressClearRequired(); + transportInterruptionProcessingComplete.incrementAndGet(); try { connection.getScheduler().executeAfterDelay(new Runnable() { public void run() { @@ -2012,7 +2014,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } for (Iterator i = consumers.iterator(); i.hasNext();) { ActiveMQMessageConsumer consumer = i.next(); - if (consumer.getMessageListener() != null) { + if (consumer.hasMessageListener()) { throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); } } diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 6a1ca36323..8fbf81bb87 100755 --- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -406,6 +406,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { if (ss != null) { ss.removeConsumer(id); } + cs.getRecoveringPullConsumers().remove(id); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index 9115c15b2b..9c08c81e63 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -167,6 +167,7 @@ public class FailoverConsumerOutstandingCommitTest { LOG.info("producer started"); try { produceMessage(producerSession, destination, prefetch * 2); + } catch (javax.jms.IllegalStateException SessionClosedExpectedOnShutdown) { } catch (JMSException e) { e.printStackTrace(); fail("unexpceted ex on producer: " + e); @@ -273,6 +274,7 @@ public class FailoverConsumerOutstandingCommitTest { LOG.info("producer started"); try { produceMessage(producerSession, destination, prefetch * 2); + } catch (javax.jms.IllegalStateException SessionClosedExpectedOnShutdown) { } catch (JMSException e) { e.printStackTrace(); fail("unexpceted ex on producer: " + e); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index ed45b95e86..54a8a01666 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -17,7 +17,9 @@ package org.apache.activemq.transport.failover; import junit.framework.Test; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerPlugin; @@ -29,10 +31,12 @@ import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.util.DestinationPathSeparatorBroker; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.SocketProxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,9 +52,15 @@ import javax.jms.ServerSessionPool; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TransactionRolledBackException; +import java.io.IOException; import java.net.URI; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.NoSuchElementException; +import java.util.Stack; import java.util.Vector; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -874,6 +884,139 @@ public class FailoverTransactionTest extends TestSupport { connection.close(); } + public void testPoolingNConsumesAfterReconnect() throws Exception { + broker = createBroker(true); + setDefaultPersistenceAdapter(broker); + + broker.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + int count = 0; + + @Override + public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception { + if (count++ == 1) { + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker on removeConsumer: " + info); + try { + broker.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + }); + broker.start(); + + Vector connections = new Vector(); + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); + Connection connection = cf.createConnection(); + connection.start(); + Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); + + produceMessage(producerSession, destination); + connection.close(); + + connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + final int sessionCount = 10; + final Stack sessions = new Stack(); + for (int i = 0; i < sessionCount; i++) { + sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); + } + + final int consumerCount = 1000; + final Deque consumers = new ArrayDeque(); + for (int i = 0; i < consumerCount; i++) { + consumers.push(consumerSession.createConsumer(destination)); + } + final ExecutorService executorService = Executors.newCachedThreadPool(); + + final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class); + final TransportListener delegate = failoverTransport.getTransportListener(); + failoverTransport.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + delegate.onCommand(command); + } + + @Override + public void onException(IOException error) { + delegate.onException(error); + } + + @Override + public void transportInterupted() { + + LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE")); + for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) { + + executorService.execute(new Runnable() { + public void run() { + MessageConsumer localConsumer = null; + try { + synchronized (delegate) { + localConsumer = consumers.pop(); + } + localConsumer.receive(1); + + LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId()); + localConsumer.close(); + } catch (NoSuchElementException nse) { + } catch (Exception ignored) { + LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored); + } + } + }); + } + + delegate.transportInterupted(); + } + + @Override + public void transportResumed() { + delegate.transportResumed(); + } + }); + + + MessageConsumer consumer = null; + synchronized (delegate) { + consumer = consumers.pop(); + } + LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId()); + consumer.close(); + + // will be stopped by the plugin + broker.waitUntilStopped(); + broker = createBroker(false, url); + setDefaultPersistenceAdapter(broker); + broker.start(); + + consumer = consumerSession.createConsumer(destination); + LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId()); + + Message msg = null; + for (int i = 0; i < 4 && msg == null; i++) { + msg = consumer.receive(1000); + } + LOG.info("post: from consumer1 received: " + msg); + assertNotNull("got message after failover", msg); + msg.acknowledge(); + + for (Connection c : connections) { + c.close(); + } + } + public void testAutoRollbackWithMissingRedeliveries() throws Exception { broker = createBroker(true); broker.start(); @@ -991,8 +1134,8 @@ public class FailoverTransactionTest extends TestSupport { final Vector exceptions = new Vector(); - // commit may fail if other consumer gets the message on restart, it will be seen a a duplicate on teh connection - // but with no transaciton and it pending on another consumer it will be posion + // commit may fail if other consumer gets the message on restart, it will be seen as a duplicate on the connection + // but with no transaction and it pending on another consumer it will be poison Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { LOG.info("doing async commit..."); @@ -1012,7 +1155,7 @@ public class FailoverTransactionTest extends TestSupport { // either message consumed or sent to dlq via poison on redelivery to wrong consumer // message should not be available again in any event - assertNull("consumer should not get rolledback on non redelivered message or duplicate", consumer.receive(5000)); + assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000)); // consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases if (exceptions.isEmpty()) {