From 40f49ef0bca0cba7fd3df22a807e46e36a59f2c7 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 13 Oct 2017 09:15:41 +0200 Subject: [PATCH] ARTEMIS-1462 Allow ActiveMQScheduledComponent initial delay configuration It contains: - an improved documentation of the constructors - the initial delay configuration --- .../server/ActiveMQScheduledComponent.java | 98 +++++++++++++++++-- .../utils/ActiveMQScheduledComponentTest.java | 33 +++++++ 2 files changed, 125 insertions(+), 6 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java index 9524d89547..d891dd5e5d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java @@ -37,6 +37,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class); private ScheduledExecutorService scheduledExecutorService; private boolean startedOwnScheduler; + private long initialDelay; private long period; private long millisecondsPeriod; private TimeUnit timeUnit; @@ -48,27 +49,79 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R private final AtomicInteger delayed = new AtomicInteger(0); + /** + * It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}. + * + * @param scheduledExecutorService the {@link ScheduledExecutorService} that periodically trigger {@link #run()} on the configured {@code executor} + * @param executor the {@link Executor} that execute {@link #run()} when triggered + * @param initialDelay the time to delay first execution + * @param checkPeriod the delay between the termination of one execution and the start of the next + * @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters + * @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise + */ public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService, Executor executor, + long initialDelay, long checkPeriod, TimeUnit timeUnit, boolean onDemand) { this.executor = executor; this.scheduledExecutorService = scheduledExecutorService; + this.initialDelay = initialDelay; this.period = checkPeriod; this.timeUnit = timeUnit; this.onDemand = onDemand; } /** - * This is useful for cases where we want our own scheduler executor. + * It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}. * - * @param checkPeriod - * @param timeUnit - * @param onDemand + *

+ * The component created will have {@code initialDelay} defaulted to {@code checkPeriod}. + * + * @param scheduledExecutorService the {@link ScheduledExecutorService} that periodically trigger {@link #run()} on the configured {@code executor} + * @param executor the {@link Executor} that execute {@link #run()} when triggered + * @param checkPeriod the delay between the termination of one execution and the start of the next + * @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters + * @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise + */ + public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService, + Executor executor, + long checkPeriod, + TimeUnit timeUnit, + boolean onDemand) { + this(scheduledExecutorService, executor, checkPeriod, checkPeriod, timeUnit, onDemand); + } + + /** + * It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}. + * + *

+ * This is useful for cases where we want our own scheduler executor: on {@link #start()} it will create a fresh new single-threaded {@link ScheduledExecutorService} + * using {@link #getThreadFactory()} and {@link #getThisClassLoader()}, while on {@link #stop()} it will garbage it. + * + * @param initialDelay the time to delay first execution + * @param checkPeriod the delay between the termination of one execution and the start of the next + * @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters + * @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise + */ + public ActiveMQScheduledComponent(long initialDelay, long checkPeriod, TimeUnit timeUnit, boolean onDemand) { + this(null, null, initialDelay, checkPeriod, timeUnit, onDemand); + } + + /** + * It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}. + * + *

+ * This is useful for cases where we want our own scheduler executor. + * The component created will have {@code initialDelay} defaulted to {@code checkPeriod}. + * + * @param checkPeriod the delay between the termination of one execution and the start of the next + * @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters + * @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise */ public ActiveMQScheduledComponent(long checkPeriod, TimeUnit timeUnit, boolean onDemand) { - this(null, null, checkPeriod, timeUnit, onDemand); + this(null, null, checkPeriod, checkPeriod, timeUnit, onDemand); } @Override @@ -91,7 +144,7 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R this.millisecondsPeriod = timeUnit.convert(period, TimeUnit.MILLISECONDS); if (period >= 0) { - future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, period, period, timeUnit); + future = scheduledExecutorService.scheduleWithFixedDelay(runForScheduler, initialDelay, period, timeUnit); } else { logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period); } @@ -133,6 +186,39 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R return this; } + public long getInitialDelay() { + return initialDelay; + } + + public synchronized ActiveMQScheduledComponent setInitialDelay(long initialDelay) { + this.initialDelay = initialDelay; + restartIfNeeded(); + return this; + } + + /** + * Useful to change a running schedule and avoid multiple restarts. + */ + public synchronized ActiveMQScheduledComponent setInitialDelayAndPeriod(long initialDelay, long period) { + this.period = period; + this.initialDelay = initialDelay; + restartIfNeeded(); + return this; + } + + /** + * Useful to change a running schedule and avoid multiple restarts. + */ + public synchronized ActiveMQScheduledComponent setInitialDelayAndPeriod(long initialDelay, + long period, + TimeUnit timeUnit) { + this.period = period; + this.initialDelay = initialDelay; + this.timeUnit = timeUnit; + restartIfNeeded(); + return this; + } + public TimeUnit getTimeUnit() { return timeUnit; } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java index 76bdea610e..25cc3e110e 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java @@ -165,4 +165,37 @@ public class ActiveMQScheduledComponentTest { } } + @Test + public void testUsingCustomInitialDelay() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final long initialDelayMillis = 100; + final long checkPeriodMillis = 100 * initialDelayMillis; + final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, initialDelayMillis, checkPeriodMillis, TimeUnit.MILLISECONDS, false) { + @Override + public void run() { + latch.countDown(); + } + }; + final long start = System.nanoTime(); + local.start(); + try { + final boolean triggeredBeforePeriod = latch.await(local.getPeriod(), local.getTimeUnit()); + final long timeToFirstTrigger = TimeUnit.NANOSECONDS.convert(System.nanoTime() - start, local.getTimeUnit()); + Assert.assertTrue("Takes too long to start", triggeredBeforePeriod); + Assert.assertTrue("Started too early", timeToFirstTrigger >= local.getInitialDelay()); + } finally { + local.stop(); + } + } + + @Test + public void testVerifyDefaultInitialDelay() throws InterruptedException { + final ActiveMQScheduledComponent local = new ActiveMQScheduledComponent(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, false) { + @Override + public void run() { + + } + }; + Assert.assertEquals("The initial delay must be defaulted to the period", local.getPeriod(), local.getInitialDelay()); + } }