ARTEMIS-3636 LinkedListImpl leak on mesage consume error
This commit is contained in:
parent
1c9516db6a
commit
54f4cb560c
|
@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.client.impl;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.security.AccessController;
|
import java.security.AccessController;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
@ -43,6 +42,7 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
|
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
|
||||||
|
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
|
||||||
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
|
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
|
||||||
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
|
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -710,10 +710,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
// Need to send credits for the messages in the buffer
|
// Need to send credits for the messages in the buffer
|
||||||
|
|
||||||
Iterator<ClientMessageInternal> iter = buffer.iterator();
|
try (LinkedListIterator<ClientMessageInternal> iter = buffer.iterator()) {
|
||||||
|
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
try {
|
|
||||||
ClientMessageInternal message = iter.next();
|
ClientMessageInternal message = iter.next();
|
||||||
|
|
||||||
if (message.isLargeMessage()) {
|
if (message.isLargeMessage()) {
|
||||||
|
@ -722,10 +720,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
}
|
}
|
||||||
|
|
||||||
flowControlBeforeConsumption(message);
|
flowControlBeforeConsumption(message);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQClientLogger.LOGGER.errorClearingMessages(e);
|
ActiveMQClientLogger.LOGGER.errorClearingMessages(e);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
clearBuffer();
|
clearBuffer();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue