This commit is contained in:
Clebert Suconic 2020-10-28 09:53:07 -04:00
commit 4d6096f88d
2 changed files with 91 additions and 54 deletions

View File

@ -20,11 +20,12 @@ package org.apache.activemq.artemis.core.server;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.jboss.logging.Logger;
@ -41,15 +42,13 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
/** initialDelay < 0 would mean no initial delay, use the period instead */
private long initialDelay;
private long period;
private long millisecondsPeriod;
private TimeUnit timeUnit;
private final Executor executor;
private volatile ScheduledFuture future;
private volatile boolean isStarted;
private ScheduledFuture future;
private final boolean onDemand;
long lastTime = 0;
private final AtomicInteger delayed = new AtomicInteger(0);
// The start/stop actions shouldn't interact concurrently with delay so it doesn't need to be volatile
private AtomicBoolean bookedForRunning;
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
@ -73,6 +72,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
this.bookedForRunning = new AtomicBoolean(false);
this.isStarted = false;
}
/**
@ -89,12 +90,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
this.executor = null;
this.scheduledExecutorService = scheduledExecutorService;
this.initialDelay = initialDelay;
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
this(scheduledExecutorService, null, initialDelay, checkPeriod, timeUnit, onDemand);
}
/**
@ -150,11 +146,11 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
@Override
public synchronized void start() {
if (future != null) {
if (isStarted) {
// already started
return;
}
isStarted = true;
if (scheduledExecutorService == null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, getThreadFactory());
startedOwnScheduler = true;
@ -165,10 +161,9 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
return;
}
this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay >= 0 ? initialDelay : period, period, timeUnit);
final AtomicBoolean booked = this.bookedForRunning;
future = scheduledExecutorService.scheduleWithFixedDelay(() -> runForScheduler(booked), initialDelay >= 0 ? initialDelay : period, period, timeUnit);
} else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
}
@ -188,15 +183,26 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
public void delay() {
int value = delayed.incrementAndGet();
if (value > 10) {
delayed.decrementAndGet();
} else {
// We only schedule up to 10 periods upfront.
// this is to avoid a window where a current one would be running and a next one is coming.
// in theory just 2 would be enough. I'm using 10 as a precaution here.
scheduledExecutorService.schedule(runForScheduler, Math.min(period, period * value), timeUnit);
/**
* A delay request can succeed only if:
* <ul>
* <li>there is no other pending delay request
* <li>there is no pending execution request
* </ul>
* <p>
* When a delay request succeed it schedule a new execution to happen in {@link #getPeriod()}.<br>
*/
public boolean delay() {
final AtomicBoolean booked = this.bookedForRunning;
if (!booked.compareAndSet(false, true)) {
return false;
}
try {
scheduledExecutorService.schedule(() -> bookedRunForScheduler(booked), period, timeUnit);
return true;
} catch (RejectedExecutionException e) {
booked.set(false);
throw e;
}
}
@ -261,7 +267,14 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
public void stop() {
public synchronized void stop() {
if (!isStarted) {
return;
}
isStarted = false;
// Replace the existing one: a new periodic task or any new delay after stop
// won't interact with the previously running ones
this.bookedForRunning = new AtomicBoolean(false);
if (future != null) {
future.cancel(false);
future = null;
@ -275,8 +288,8 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
@Override
public synchronized boolean isStarted() {
return future != null;
public boolean isStarted() {
return isStarted;
}
// this will restart the scheduled component upon changes
@ -287,35 +300,43 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
}
}
final Runnable runForExecutor = new Runnable() {
@Override
public void run() {
if (onDemand && delayed.get() > 0) {
delayed.decrementAndGet();
}
private void runForExecutor(AtomicBoolean booked) {
// It unblocks:
// - a new delay request
// - next periodic run request (in case of executor != null)
// Although tempting, don't move this one after ActiveMQScheduledComponent.this.run():
// - it can cause "delay" to change semantic ie a racing delay while finished executing the task, won't succeed
// - it won't prevent "slow tasks" to accumulate, because slowness cannot be measured inside running method;
// it just cause skipping runs for perfectly timed executions too
boolean alwaysTrue = booked.compareAndSet(true, false);
assert alwaysTrue;
ActiveMQScheduledComponent.this.run();
}
if (!onDemand && lastTime > 0) {
if (System.currentTimeMillis() - lastTime < millisecondsPeriod) {
logger.trace("Execution ignored due to too many simultaneous executions, probably a previous delayed execution");
return;
private void bookedRunForScheduler(AtomicBoolean booked) {
assert booked.get();
if (executor != null) {
try {
executor.execute(() -> runForExecutor(booked));
} catch (RejectedExecutionException e) {
if (booked != null) {
booked.set(false);
}
throw e;
}
lastTime = System.currentTimeMillis();
ActiveMQScheduledComponent.this.run();
} else {
runForExecutor(booked);
}
};
}
final Runnable runForScheduler = new Runnable() {
@Override
public void run() {
if (executor != null) {
executor.execute(runForExecutor);
} else {
runForExecutor.run();
}
private void runForScheduler(AtomicBoolean booked) {
if (!booked.compareAndSet(false, true)) {
// let's skip this execution because there is:
// - a previously submitted period task yet to start -> executor is probably overbooked!
// - a pending delay request
return;
}
};
bookedRunForScheduler(booked);
}
}

View File

@ -78,6 +78,22 @@ public class ActiveMQScheduledComponentTest {
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5);
}
@Test
public void testSubMillisDelay() throws InterruptedException {
final CountDownLatch triggered = new CountDownLatch(2);
final long nsInterval = TimeUnit.MICROSECONDS.toNanos(900);
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, nsInterval, TimeUnit.NANOSECONDS, false) {
@Override
public void run() {
triggered.countDown();
}
};
local.start();
Assert.assertTrue(triggered.await(10, TimeUnit.SECONDS));
local.stop();
}
@Test
public void testVerifyInitialDelayChanged() {
final long initialDelay = 10;