ARTEMIS-1462 Allow ActiveMQScheduledComponent initial delay configuration

It contains:
 - an improved documentation of the constructors
 - the initial delay configuration
This commit is contained in:
Francesco Nigro 2017-10-13 09:15:41 +02:00 committed by Clebert Suconic
parent b09ea433b5
commit 40f49ef0bc
2 changed files with 125 additions and 6 deletions

View File

@ -37,6 +37,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class); private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private boolean startedOwnScheduler; private boolean startedOwnScheduler;
private long initialDelay;
private long period; private long period;
private long millisecondsPeriod; private long millisecondsPeriod;
private TimeUnit timeUnit; private TimeUnit timeUnit;
@ -48,27 +49,79 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
private final AtomicInteger delayed = new AtomicInteger(0); private final AtomicInteger delayed = new AtomicInteger(0);
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
*
* @param scheduledExecutorService the {@link ScheduledExecutorService} that periodically trigger {@link #run()} on the configured {@code executor}
* @param executor the {@link Executor} that execute {@link #run()} when triggered
* @param initialDelay the time to delay first execution
* @param checkPeriod the delay between the termination of one execution and the start of the next
* @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters
* @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise
*/
public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService, public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
Executor executor, Executor executor,
long initialDelay,
long checkPeriod, long checkPeriod,
TimeUnit timeUnit, TimeUnit timeUnit,
boolean onDemand) { boolean onDemand) {
this.executor = executor; this.executor = executor;
this.scheduledExecutorService = scheduledExecutorService; this.scheduledExecutorService = scheduledExecutorService;
this.initialDelay = initialDelay;
this.period = checkPeriod; this.period = checkPeriod;
this.timeUnit = timeUnit; this.timeUnit = timeUnit;
this.onDemand = onDemand; this.onDemand = onDemand;
} }
/** /**
* This is useful for cases where we want our own scheduler executor. * It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
* *
* @param checkPeriod * <p>
* @param timeUnit * The component created will have {@code initialDelay} defaulted to {@code checkPeriod}.
* @param onDemand *
* @param scheduledExecutorService the {@link ScheduledExecutorService} that periodically trigger {@link #run()} on the configured {@code executor}
* @param executor the {@link Executor} that execute {@link #run()} when triggered
* @param checkPeriod the delay between the termination of one execution and the start of the next
* @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters
* @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise
*/
public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
Executor executor,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
this(scheduledExecutorService, executor, checkPeriod, checkPeriod, timeUnit, onDemand);
}
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
*
* <p>
* This is useful for cases where we want our own scheduler executor: on {@link #start()} it will create a fresh new single-threaded {@link ScheduledExecutorService}
* using {@link #getThreadFactory()} and {@link #getThisClassLoader()}, while on {@link #stop()} it will garbage it.
*
* @param initialDelay the time to delay first execution
* @param checkPeriod the delay between the termination of one execution and the start of the next
* @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters
* @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise
*/
public ActiveMQScheduledComponent(long initialDelay, long checkPeriod, TimeUnit timeUnit, boolean onDemand) {
this(null, null, initialDelay, checkPeriod, timeUnit, onDemand);
}
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
*
* <p>
* This is useful for cases where we want our own scheduler executor.
* The component created will have {@code initialDelay} defaulted to {@code checkPeriod}.
*
* @param checkPeriod the delay between the termination of one execution and the start of the next
* @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters
* @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise
*/ */
public ActiveMQScheduledComponent(long checkPeriod, TimeUnit timeUnit, boolean onDemand) { public ActiveMQScheduledComponent(long checkPeriod, TimeUnit timeUnit, boolean onDemand) {
this(null, null, checkPeriod, timeUnit, onDemand); this(null, null, checkPeriod, checkPeriod, timeUnit, onDemand);
} }
@Override @Override
@ -91,7 +144,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS); this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS);
if (period >= 0) { if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit); future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay, period, timeUnit);
} else { } else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period); logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
} }
@ -133,6 +186,39 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
return this; return this;
} }
public long getInitialDelay() {
return initialDelay;
}
public synchronized ActiveMQScheduledComponent setInitialDelay(long initialDelay) {
this.initialDelay = initialDelay;
restartIfNeeded();
return this;
}
/**
* Useful to change a running schedule and avoid multiple restarts.
*/
public synchronized ActiveMQScheduledComponent setInitialDelayAndPeriod(long initialDelay, long period) {
this.period = period;
this.initialDelay = initialDelay;
restartIfNeeded();
return this;
}
/**
* Useful to change a running schedule and avoid multiple restarts.
*/
public synchronized ActiveMQScheduledComponent setInitialDelayAndPeriod(long initialDelay,
long period,
TimeUnit timeUnit) {
this.period = period;
this.initialDelay = initialDelay;
this.timeUnit = timeUnit;
restartIfNeeded();
return this;
}
public TimeUnit getTimeUnit() { public TimeUnit getTimeUnit() {
return timeUnit; return timeUnit;
} }

View File

@ -165,4 +165,37 @@ public class ActiveMQScheduledComponentTest {
} }
} }
@Test
public void testUsingCustomInitialDelay() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final long initialDelayMillis = 100;
final long checkPeriodMillis = 100 * initialDelayMillis;
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, initialDelayMillis, checkPeriodMillis, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {
latch.countDown();
}
};
final long start = System.nanoTime();
local.start();
try {
final boolean triggeredBeforePeriod = latch.await(local.getPeriod(), local.getTimeUnit());
final long timeToFirstTrigger = TimeUnit.NANOSECONDS.convert(System.nanoTime() - start, local.getTimeUnit());
Assert.assertTrue("Takes too long to start", triggeredBeforePeriod);
Assert.assertTrue("Started too early", timeToFirstTrigger >= local.getInitialDelay());
} finally {
local.stop();
}
}
@Test
public void testVerifyDefaultInitialDelay() throws InterruptedException {
final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, false) {
@Override
public void run() {
}
};
Assert.assertEquals("The initial delay must be defaulted to the period", local.getPeriod(), local.getInitialDelay());
}
} }