ARTEMIS-2926 Scheduled task executions are skipped randomly

Making Scheduled task to be more reliable when
using scheduledComponent.delay() method and saving
periodic tasks to be skipped although on correct timing
This commit is contained in:
franz1981 2020-10-02 18:58:26 +02:00 committed by Clebert Suconic
parent 647151b0af
commit e2c1848da4
2 changed files with 91 additions and 54 deletions

View File

@ -20,11 +20,12 @@ package org.apache.activemq.artemis.core.server;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger;
@ -41,15 +42,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
/** initialDelay < 0 would mean no initial delay, use the period instead */
private long initialDelay;
private long period;
private long millisecondsPeriod;
private TimeUnit timeUnit;
private final Executor executor;
private volatile ScheduledFuture future;
private volatile boolean isStarted;
private ScheduledFuture future;
private final boolean onDemand;
long lastTime = 0;
private final AtomicInteger delayed = new AtomicInteger(0);
// The start/stop actions shouldn't interact concurrently with delay so it doesn't need to be volatile
private AtomicBoolean bookedForRunning;
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
@ -73,6 +72,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
this.bookedForRunning = new AtomicBoolean(false);
this.isStarted = false;
}
/**
@ -89,12 +90,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
this.executor = null;
this.scheduledExecutorService = scheduledExecutorService;
this.initialDelay = initialDelay;
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
this(scheduledExecutorService, null, initialDelay, checkPeriod, timeUnit, onDemand);
}
/**
@ -150,11 +146,11 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
@Override
public synchronized void start() {
if (future != null) {
if (isStarted) {
// already started
return;
}
isStarted = true;
if (scheduledExecutorService == null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, getThreadFactory());
startedOwnScheduler = true;
@ -165,10 +161,9 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
return;
}
this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay >= 0 ? initialDelay : period, period, timeUnit);
final AtomicBoolean booked = this.bookedForRunning;
future = scheduledExecutorService.scheduleWithFixedDelay(() -> runForScheduler(booked), initialDelay >= 0 ? initialDelay : period, period, timeUnit);
} else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
}
@ -188,15 +183,26 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
public void delay() {
int value = delayed.incrementAndGet();
if (value > 10) {
delayed.decrementAndGet();
} else {
// We only schedule up to 10 periods upfront.
// this is to avoid a window where a current one would be running and a next one is coming.
// in theory just 2 would be enough. I'm using 10 as a precaution here.
scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
/**
* A delay request can succeed only if:
* <ul>
* <li>there is no other pending delay request
* <li>there is no pending execution request
* </ul>
* <p>
* When a delay request succeed it schedule a new execution to happen in {@link #getPeriod()}.<br>
*/
public boolean delay() {
final AtomicBoolean booked = this.bookedForRunning;
if (!booked.compareAndSet(false, true)) {
return false;
}
try {
scheduledExecutorService.schedule(() -> bookedRunForScheduler(booked), period, timeUnit);
return true;
} catch (RejectedExecutionException e) {
booked.set(false);
throw e;
}
}
@ -261,7 +267,14 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
public void stop() {
public synchronized void stop() {
if (!isStarted) {
return;
}
isStarted = false;
// Replace the existing one: a new periodic task or any new delay after stop
// won't interact with the previously running ones
this.bookedForRunning = new AtomicBoolean(false);
if (future != null) {
future.cancel(false);
future = null;
@ -275,8 +288,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
public synchronized boolean isStarted() {
return future != null;
public boolean isStarted() {
return isStarted;
}
// this will restart the scheduled component upon changes
@ -287,35 +300,43 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
}
final Runnable runForExecutor = new Runnable() {
@Override
public void run() {
if (onDemand && delayed.get() > 0) {
delayed.decrementAndGet();
}
private void runForExecutor(AtomicBoolean booked) {
// It unblocks:
// - a new delay request
// - next periodic run request (in case of executor != null)
// Although tempting, don't move this one after ActiveMQScheduledComponent.this.run():
// - it can cause "delay" to change semantic ie a racing delay while finished executing the task, won't succeed
// - it won't prevent "slow tasks" to accumulate, because slowness cannot be measured inside running method;
// it just cause skipping runs for perfectly timed executions too
boolean alwaysTrue = booked.compareAndSet(true, false);
assert alwaysTrue;
ActiveMQScheduledComponent.this.run();
}
if (!onDemand && lastTime > 0) {
if (System.currentTimeMillis() - lastTime < millisecondsPeriod) {
logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution");
return;
private void bookedRunForScheduler(AtomicBoolean booked) {
assert booked.get();
if (executor != null) {
try {
executor.execute(() -> runForExecutor(booked));
} catch (RejectedExecutionException e) {
if (booked != null) {
booked.set(false);
}
throw e;
}
lastTime = System.currentTimeMillis();
ActiveMQScheduledComponent.this.run();
} else {
runForExecutor(booked);
}
};
}
final Runnable runForScheduler = new Runnable() {
@Override
public void run() {
if (executor != null) {
executor.execute(runForExecutor);
} else {
runForExecutor.run();
}
private void runForScheduler(AtomicBoolean booked) {
if (!booked.compareAndSet(false, true)) {
// let's skip this execution because there is:
// - a previously submitted period task yet to start -> executor is probably overbooked!
// - a pending delay request
return;
}
};
bookedRunForScheduler(booked);
}
}

View File

@ -78,6 +78,22 @@ public class ActiveMQScheduledComponentTest {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
}
@Test
public void testSubMillisDelay() throws InterruptedException {
final CountDownLatch triggered = new CountDownLatch(2);
final long nsInterval = TimeUnit.MICROSECONDS.toNanos(900);
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, nsInterval, TimeUnit.NANOSECONDS, false) {
@Override
public void run() {
triggered.countDown();
}
};
local.start();
Assert.assertTrue(triggered.await(10, TimeUnit.SECONDS));
local.stop();
}
@Test
public void testVerifyInitialDelayChanged() {
final long initialDelay = 10;