diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 0a96134588..47ed980ddf 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -221,6 +221,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta protected boolean sessionAsyncDispatch; protected final boolean debug; protected Object sendMutex = new Object(); + private final AtomicBoolean clearInProgress = new AtomicBoolean(); private MessageListener messageListener; private final JMSSessionStatsImpl stats; @@ -650,21 +651,39 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { executor.clearMessagesInProgress(); - // we are called from inside the transport reconnection logic - // which involves us clearing all the connections' consumers - // dispatch and delivered lists. So rather than trying to - // grab a mutex (which could be already owned by the message - // listener calling the send or an ack) we allow it to complete in - // a separate thread via the scheduler and notify us via - // connection.transportInterruptionProcessingComplete() + // we are called from inside the transport reconnection logic which involves us + // clearing all the connections' consumers dispatch and delivered lists. So rather + // than trying to grab a mutex (which could be already owned by the message listener + // calling the send or an ack) we allow it to complete in a separate thread via the + // scheduler and notify us via connection.transportInterruptionProcessingComplete() // - for (final ActiveMQMessageConsumer consumer : consumers) { - consumer.inProgressClearRequired(); - transportInterruptionProcessingComplete.incrementAndGet(); + // We must be careful though not to allow multiple calls to this method from a + // connection that is having issue becoming fully established from causing a large + // build up of scheduled tasks to clear the same consumers over and over. + if (consumers.isEmpty()) { + return; + } + + if (clearInProgress.compareAndSet(false, true)) { + for (final ActiveMQMessageConsumer consumer : consumers) { + consumer.inProgressClearRequired(); + transportInterruptionProcessingComplete.incrementAndGet(); + try { + connection.getScheduler().executeAfterDelay(new Runnable() { + @Override + public void run() { + consumer.clearMessagesInProgress(); + }}, 0l); + } catch (JMSException e) { + connection.onClientInternalException(e); + } + } + try { connection.getScheduler().executeAfterDelay(new Runnable() { + @Override public void run() { - consumer.clearMessagesInProgress(); + clearInProgress.set(false); }}, 0l); } catch (JMSException e) { connection.onClientInternalException(e);