ARTEMIS-3778 Streamline Expiration Reaping

Instead of holding a thread and an iterator, we should instead keep moving to next references
without holding any threads. Just with callbacks.
This commit is contained in:
Clebert Suconic 2022-04-13 13:17:44 -04:00 committed by clebertsuconic
parent ccfd4b7a62
commit f4bdacbc4c
5 changed files with 46 additions and 64 deletions

View File

@ -36,14 +36,14 @@ import org.jboss.logging.Logger;
public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
private ScheduledExecutorService scheduledExecutorService;
protected ScheduledExecutorService scheduledExecutorService;
private boolean startedOwnScheduler;
/** initialDelay < 0 would mean no initial delay, use the period instead */
private long initialDelay;
private long period;
private TimeUnit timeUnit;
private final Executor executor;
protected final Executor executor;
private volatile boolean isStarted;
private ScheduledFuture future;
private final boolean onDemand;

View File

@ -30,11 +30,9 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@ -1846,43 +1844,37 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
}
volatile CountDownLatch inUseLatch;
@Override
public void stop() {
super.stop();
// this will do a best effort to stop the current latch.
// no big deal if it failed. this is just to optimize this component stop.
CountDownLatch latch = inUseLatch;
if (latch != null) {
latch.countDown();
}
}
private Iterator<Queue> iterator;
private Queue currentQueue;
@Override
public void run() {
// The reaper thread should be finished case the PostOffice is gone
// This is to avoid leaks on PostOffice between stops and starts
for (Queue queue : iterableOf(getLocalQueues())) {
if (!isStarted()) {
break;
}
try {
CountDownLatch latch = new CountDownLatch(1);
this.inUseLatch = latch;
queue.expireReferences(latch::countDown);
// the idea is in fact to block the Reaper while the Queue is executing reaping.
// This would avoid another eventual expiry to be called if the period for reaping is too small
// This should also avoid bursts in CPU consumption because of the expiry reaping
if (!latch.await(10, TimeUnit.SECONDS)) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(new TimeoutException(queue.getName().toString()));
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
}
if (iterator != null) {
logger.debugf("A previous reaping call has not finished yet, and it is currently working on %s", currentQueue);
return;
}
iterator = iterableOf(getLocalQueues()).iterator();
moveNext();
}
private void done() {
executor.execute(this::moveNext);
}
private void moveNext() {
if (!iterator.hasNext() || !this.isStarted()) {
iterator = null;
currentQueue = null;
return;
}
currentQueue = iterator.next();
// we will expire messages on this queue, once done we move to the next queue
currentQueue.expireReferences(this::done);
}
}

View File

@ -1823,10 +1823,6 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 224012, value = "error releasing resources", format = Message.Format.MESSAGE_FORMAT)
void largeMessageErrorReleasingResources(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224013, value = "failed to expire messages for queue", format = Message.Format.MESSAGE_FORMAT)
void errorExpiringMessages(@Cause Exception e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224014, value = "Failed to close session", format = Message.Format.MESSAGE_FORMAT)
void errorClosingSession(@Cause Exception e);

View File

@ -295,8 +295,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private AddressSettingsRepositoryListener addressSettingsRepositoryListener;
private final ExpiryScanner expiryScanner = new ExpiryScanner();
private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
@ -2361,13 +2359,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
if (expiryScanner.scannerRunning.incrementAndGet() == 1) {
expiryScanner.doneCallback = done;
}
getExecutor().execute(expiryScanner);
if (!queueDestroyed) {
getExecutor().execute(new ExpiryScanner(done));
} else {
// expire is already happening on this queue, move on!
// queue is destroyed, move on
if (done != null) {
done.run();
}
@ -2388,24 +2383,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
class ExpiryScanner implements Runnable {
public Runnable doneCallback;
public AtomicInteger scannerRunning = new AtomicInteger(0);
private final Runnable doneCallback;
ExpiryScanner(Runnable doneCallback) {
this.doneCallback = doneCallback;
}
LinkedListIterator<MessageReference> iter = null;
@Override
public void run() {
boolean expired = false;
boolean hasElements = false;
int elementsIterated = 0;
int elementsExpired = 0;
boolean rescheduled = false;
LinkedList<MessageReference> expiredMessages = new LinkedList<>();
synchronized (QueueImpl.this) {
if (queueDestroyed) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Scanning for expires on " + QueueImpl.this.getName());
}
@ -2422,7 +2418,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
try {
while (postOffice.isStarted() && iter.hasNext()) {
while (!queueDestroyed && postOffice.isStarted() && iter.hasNext()) {
hasElements = true;
MessageReference ref = iter.next();
if (ref.getMessage().isExpired()) {
@ -2433,14 +2429,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
iter.remove();
}
if (++elementsIterated >= MAX_DELIVERIES_IN_LOOP) {
logger.debug("Breaking loop of expiring");
scannerRunning.incrementAndGet();
logger.debugf("Expiry Scanner on %s ran for %s iteration, scheduling a new one", QueueImpl.this.getName(), elementsIterated);
rescheduled = true;
getExecutor().execute(this);
break;
}
}
} finally {
if (scannerRunning.decrementAndGet() == 0) {
if (!rescheduled) {
logger.debugf("Scanning for expires on %s done", QueueImpl.this.getName());
if (server.hasBrokerQueuePlugins()) {
try {
server.callBrokerQueuePlugins((p) -> p.afterExpiryScan(QueueImpl.this));
@ -2454,12 +2452,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (doneCallback != null) {
doneCallback.run();
doneCallback = null;
}
}
logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
}
}

View File

@ -1670,7 +1670,7 @@ public class PagingTest extends ActiveMQTestBase {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0) // disable compact
.setMessageExpiryScanPeriod(500);
.setMessageExpiryScanPeriod(10);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);