mirror of https://github.com/apache/activemq.git
Gate the session clear in progress code so that overlapping transportInterrupted calls don't start consuming lots of memory for no reason.
This commit is contained in:
parent
2f9c43f11f
commit
c7d66e944d
|
@ -221,6 +221,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
protected boolean sessionAsyncDispatch;
|
||||
protected final boolean debug;
|
||||
protected Object sendMutex = new Object();
|
||||
private final AtomicBoolean clearInProgress = new AtomicBoolean();
|
||||
|
||||
private MessageListener messageListener;
|
||||
private final JMSSessionStatsImpl stats;
|
||||
|
@ -650,21 +651,39 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
|
||||
void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
|
||||
executor.clearMessagesInProgress();
|
||||
// we are called from inside the transport reconnection logic
|
||||
// which involves us clearing all the connections' consumers
|
||||
// dispatch and delivered lists. So rather than trying to
|
||||
// grab a mutex (which could be already owned by the message
|
||||
// listener calling the send or an ack) we allow it to complete in
|
||||
// a separate thread via the scheduler and notify us via
|
||||
// connection.transportInterruptionProcessingComplete()
|
||||
// we are called from inside the transport reconnection logic which involves us
|
||||
// clearing all the connections' consumers dispatch and delivered lists. So rather
|
||||
// than trying to grab a mutex (which could be already owned by the message listener
|
||||
// calling the send or an ack) we allow it to complete in a separate thread via the
|
||||
// scheduler and notify us via connection.transportInterruptionProcessingComplete()
|
||||
//
|
||||
for (final ActiveMQMessageConsumer consumer : consumers) {
|
||||
consumer.inProgressClearRequired();
|
||||
transportInterruptionProcessingComplete.incrementAndGet();
|
||||
// We must be careful though not to allow multiple calls to this method from a
|
||||
// connection that is having issue becoming fully established from causing a large
|
||||
// build up of scheduled tasks to clear the same consumers over and over.
|
||||
if (consumers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (clearInProgress.compareAndSet(false, true)) {
|
||||
for (final ActiveMQMessageConsumer consumer : consumers) {
|
||||
consumer.inProgressClearRequired();
|
||||
transportInterruptionProcessingComplete.incrementAndGet();
|
||||
try {
|
||||
connection.getScheduler().executeAfterDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
consumer.clearMessagesInProgress();
|
||||
}}, 0l);
|
||||
} catch (JMSException e) {
|
||||
connection.onClientInternalException(e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
connection.getScheduler().executeAfterDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
consumer.clearMessagesInProgress();
|
||||
clearInProgress.set(false);
|
||||
}}, 0l);
|
||||
} catch (JMSException e) {
|
||||
connection.onClientInternalException(e);
|
||||
|
|
Loading…
Reference in New Issue