This commit is contained in:
Clebert Suconic 2018-06-06 16:12:32 -04:00
commit 2ca271648d
1 changed files with 49 additions and 45 deletions

View File

@ -906,7 +906,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
leaveCritical(CRITICAL_CONSUMER);
}
}
@Override
@ -1178,22 +1177,22 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
public synchronized int getScheduledCount() {
public int getScheduledCount() {
return scheduledDeliveryHandler.getScheduledCount();
}
@Override
public synchronized long getScheduledSize() {
public long getScheduledSize() {
return scheduledDeliveryHandler.getScheduledSize();
}
@Override
public synchronized int getDurableScheduledCount() {
public int getDurableScheduledCount() {
return scheduledDeliveryHandler.getDurableScheduledCount();
}
@Override
public synchronized long getDurableScheduledSize() {
public long getDurableScheduledSize() {
return scheduledDeliveryHandler.getDurableScheduledSize();
}
@ -1788,6 +1787,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void run() {
boolean expired = false;
boolean hasElements = false;
int elementsExpired = 0;
LinkedList<MessageReference> expiredMessages = new LinkedList<>();
synchronized (QueueImpl.this) {
if (queueDestroyed) {
return;
@ -1796,54 +1801,24 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
LinkedListIterator<MessageReference> iter = iterator();
boolean expired = false;
boolean hasElements = false;
int elementsExpired = 0;
try {
Transaction tx = null;
while (postOffice.isStarted() && iter.hasNext()) {
hasElements = true;
MessageReference ref = iter.next();
try {
if (ref.getMessage().isExpired()) {
if (tx == null) {
tx = new TransactionImpl(storageManager);
}
incDelivering(ref);
expired = true;
expire(tx, ref);
iter.remove();
refRemoved(ref);
if (ref.getMessage().isExpired()) {
incDelivering(ref);
expired = true;
expiredMessages.add(ref);
iter.remove();
if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
logger.debug("Breaking loop of expiring");
scannerRunning.incrementAndGet();
getExecutor().execute(this);
break;
}
if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
logger.debug("Breaking loop of expiring");
scannerRunning.incrementAndGet();
getExecutor().execute(this);
break;
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
}
}
logger.debug("Expired " + elementsExpired + " references");
try {
if (tx != null) {
tx.commit();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well
if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext()) {
scheduleDepage(true);
}
} finally {
try {
iter.close();
@ -1854,6 +1829,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
if (!expiredMessages.isEmpty()) {
Transaction tx = new TransactionImpl(storageManager);
for (MessageReference ref : expiredMessages) {
if (tx == null) {
tx = new TransactionImpl(storageManager);
}
try {
expire(tx, ref);
refRemoved(ref);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
}
}
try {
tx.commit();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);
}
logger.debug("Expired " + elementsExpired + " references");
}
// If empty we need to schedule depaging to make sure we would depage expired messages as well
if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext()) {
scheduleDepage(true);
}
}
}