This commit is contained in:
Clebert Suconic 2019-12-11 10:18:43 -05:00
commit 9c134808bf
1 changed files with 11 additions and 14 deletions

View File

@ -129,9 +129,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
protected static final int CRITICAL_CHECK_DEPAGE = 4;
private static final Logger logger = Logger.getLogger(QueueImpl.class);
private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching");
private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime");
private static final AtomicLongFieldUpdater consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp");
private static final AtomicIntegerFieldUpdater<QueueImpl> dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching");
private static final AtomicLongFieldUpdater<QueueImpl> dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime");
private static final AtomicLongFieldUpdater<QueueImpl> consumerRemovedTimestampUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "consumerRemovedTimestamp");
private static final AtomicReferenceFieldUpdater<QueueImpl, Filter> filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter");
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@ -148,8 +148,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/
public static final int DELIVERY_TIMEOUT = 1000;
private static final int FLUSH_TIMEOUT = 10000;
public static final int DEFAULT_FLUSH_LIMIT = 500;
private final long id;
@ -2962,8 +2960,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
long timeout = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
if (logger.isTraceEnabled()) {
@ -2994,6 +2990,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (logger.isDebugEnabled()) {
int maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
if (depaged == 0 && queueMemorySize.get() >= maxSize) {
logger.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
}
@ -3050,7 +3048,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
long redeliveryDelay = addressSettings.getRedeliveryDelay();
int deliveryCount = reference.getDeliveryCount();
// First check DLA
@ -3063,6 +3060,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return new Pair<>(false, dlaResult);
} else {
// Second check Redelivery Delay
long redeliveryDelay = addressSettings.getRedeliveryDelay();
if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);
@ -3950,7 +3948,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
//Readonly (no remove) iterator over the messages in the queue, in order of
//paging store, intermediateMessageReferences and MessageReferences
private class QueueBrowserIterator implements LinkedListIterator<MessageReference> {
private final class QueueBrowserIterator implements LinkedListIterator<MessageReference> {
LinkedListIterator<PagedReference> pagingIterator = null;
LinkedListIterator<MessageReference> messagesIterator = null;
@ -3962,10 +3960,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return pagingIterator;
}
Iterator lastIterator = null;
Iterator<? extends MessageReference> lastIterator = null;
MessageReference cachedNext = null;
HashSet<PagePosition> previouslyBrowsed = new HashSet();
HashSet<PagePosition> previouslyBrowsed = new HashSet<>();
private QueueBrowserIterator() {
messagesIterator = new SynchronizedIterator(messageReferences.iterator());
@ -4134,10 +4132,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
private void registerMeters() {
String addressName = address.toString();
String queueName = name.toString();
if (server != null && server.getMetricsManager() != null) {
String addressName = address.toString();
String queueName = name.toString();
MetricsManager metricsManager = server.getMetricsManager();
metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {