diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index bf91e63da6..212a68e5ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -129,9 +129,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { protected static final int CRITICAL_CHECK_DEPAGE = 4; private static final Logger logger = Logger.getLogger(QueueImpl.class); - private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching"); - private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime"); - private static final AtomicLongFieldUpdater consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp"); + private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching"); + private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime"); + private static final AtomicLongFieldUpdater consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp"); private static final AtomicReferenceFieldUpdater filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter"); public static final int REDISTRIBUTOR_BATCH_SIZE = 100; @@ -148,8 +148,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { */ public static final int DELIVERY_TIMEOUT = 1000; - private static final int FLUSH_TIMEOUT = 10000; - public static final int DEFAULT_FLUSH_LIMIT = 500; private final long id; @@ -2962,8 +2960,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - int maxSize = pageSubscription.getPagingStore().getPageSizeBytes(); - long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT); if (logger.isTraceEnabled()) { @@ -2994,6 +2990,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (logger.isDebugEnabled()) { + int maxSize = pageSubscription.getPagingStore().getPageSizeBytes(); + if (depaged == 0 && queueMemorySize.get() >= maxSize) { logger.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration"); } @@ -3050,7 +3048,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); int maxDeliveries = addressSettings.getMaxDeliveryAttempts(); - long redeliveryDelay = addressSettings.getRedeliveryDelay(); int deliveryCount = reference.getDeliveryCount(); // First check DLA @@ -3063,6 +3060,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return new Pair<>(false, dlaResult); } else { // Second check Redelivery Delay + long redeliveryDelay = addressSettings.getRedeliveryDelay(); if (!ignoreRedeliveryDelay && redeliveryDelay > 0) { redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount); @@ -3950,7 +3948,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { //Readonly (no remove) iterator over the messages in the queue, in order of //paging store, intermediateMessageReferences and MessageReferences - private class QueueBrowserIterator implements LinkedListIterator { + private final class QueueBrowserIterator implements LinkedListIterator { LinkedListIterator pagingIterator = null; LinkedListIterator messagesIterator = null; @@ -3962,10 +3960,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return pagingIterator; } - Iterator lastIterator = null; + Iterator lastIterator = null; MessageReference cachedNext = null; - HashSet previouslyBrowsed = new HashSet(); + HashSet previouslyBrowsed = new HashSet<>(); private QueueBrowserIterator() { messagesIterator = new SynchronizedIterator(messageReferences.iterator()); @@ -4134,10 +4132,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } private void registerMeters() { - String addressName = address.toString(); - String queueName = name.toString(); - if (server != null && server.getMetricsManager() != null) { + String addressName = address.toString(); + String queueName = name.toString(); MetricsManager metricsManager = server.getMetricsManager(); metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {