diff --git a/core/pom.xml b/core/pom.xml
index e0ccfa1a04b..72fea546892 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -224,6 +224,10 @@
org.antlr
antlr4-runtime
+
+ io.timeandspace
+ cron-scheduler
+
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
index 029dd478000..4fbefb88e5c 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java
@@ -22,11 +22,16 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.concurrent.Future;
+
+
/**
*/
public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;
+
+ private volatile Future> scheduledFuture;
@Override
public void start()
@@ -51,4 +56,16 @@ public abstract class AbstractMonitor implements Monitor
}
public abstract boolean doMonitor(ServiceEmitter emitter);
+
+ @Override
+ public Future> getScheduledFuture()
+ {
+ return scheduledFuture;
+ }
+
+ @Override
+ public void setScheduledFuture(Future> scheduledFuture)
+ {
+ this.scheduledFuture = scheduledFuture;
+ }
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
index 9811f584bc0..6649312d4a2 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java
@@ -24,10 +24,13 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Future;
public abstract class CompoundMonitor implements Monitor
{
private final List monitors;
+
+ private volatile Future> scheduledFuture;
public CompoundMonitor(List monitors)
{
@@ -61,5 +64,17 @@ public abstract class CompoundMonitor implements Monitor
return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter)));
}
+ @Override
+ public Future> getScheduledFuture()
+ {
+ return scheduledFuture;
+ }
+
+ @Override
+ public void setScheduledFuture(Future> scheduledFuture)
+ {
+ this.scheduledFuture = scheduledFuture;
+ }
+
public abstract boolean shouldReschedule(List reschedules);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
index 2ccd5db3ca6..8a3975e57e2 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java
@@ -21,6 +21,9 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.concurrent.Future;
+
+
/**
*/
public interface Monitor
@@ -35,4 +38,8 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);
+
+ Future> getScheduledFuture();
+
+ void setScheduledFuture(Future> scheduledFuture);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
index 2adbe9510a3..961f8239482 100644
--- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
+++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java
@@ -20,41 +20,52 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.Sets;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
/**
*/
public class MonitorScheduler
{
+
+ private static final Logger log = new Logger(MonitorScheduler.class);
+
private final MonitorSchedulerConfig config;
- private final ScheduledExecutorService exec;
private final ServiceEmitter emitter;
private final Set monitors;
private final Object lock = new Object();
+ private final CronScheduler scheduler;
+ private final ExecutorService executorService;
private volatile boolean started = false;
-
+
public MonitorScheduler(
MonitorSchedulerConfig config,
- ScheduledExecutorService exec,
+ CronScheduler scheduler,
ServiceEmitter emitter,
- List monitors
+ List monitors,
+ ExecutorService executorService
)
{
this.config = config;
- this.exec = exec;
+ this.scheduler = scheduler;
this.emitter = emitter;
this.monitors = Sets.newHashSet(monitors);
+ this.executorService = executorService;
}
@LifecycleStart
@@ -124,24 +135,47 @@ public class MonitorScheduler
{
synchronized (lock) {
monitor.start();
- ScheduledExecutors.scheduleAtFixedRate(
- exec,
- config.getEmitterPeriod(),
- new Callable()
+ long rate = config.getEmitterPeriod().getMillis();
+ Future> scheduledFuture = scheduler.scheduleAtFixedRate(
+ rate,
+ rate,
+ TimeUnit.MILLISECONDS,
+ new CronTask()
{
+ private volatile Future monitorFuture = null;
@Override
- public ScheduledExecutors.Signal call()
+ public void run(long scheduledRunTimeMillis)
{
- // Run one more time even if the monitor was removed, in case there's some extra data to flush
- if (monitor.monitor(emitter) && hasMonitor(monitor)) {
- return ScheduledExecutors.Signal.REPEAT;
- } else {
- removeMonitor(monitor);
- return ScheduledExecutors.Signal.STOP;
+ try {
+ if (monitorFuture != null && monitorFuture.isDone()
+ && !(monitorFuture.get() && hasMonitor(monitor))) {
+ removeMonitor(monitor);
+ monitor.getScheduledFuture().cancel(false);
+ log.debug("Stopped rescheduling %s (delay %s)", this, rate);
+ return;
+ }
+ log.trace("Running %s (period %s)", this, rate);
+ monitorFuture = executorService.submit(new Callable()
+ {
+ @Override
+ public Boolean call()
+ {
+ try {
+ return monitor.monitor(emitter);
+ }
+ catch (Throwable e) {
+ log.error(e, "Uncaught exception.");
+ return Boolean.FALSE;
+ }
+ }
+ });
+ }
+ catch (Throwable e) {
+ log.error(e, "Uncaught exception.");
}
}
- }
- );
+ });
+ monitor.setScheduledFuture(scheduledFuture);
}
}
@@ -151,4 +185,5 @@ public class MonitorScheduler
return monitors.contains(monitor);
}
}
+
}
diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
index 76671968b91..da2ba5926f5 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java
@@ -19,17 +19,41 @@
package org.apache.druid.java.util.metrics;
+
import com.google.common.collect.ImmutableList;
-import org.apache.druid.java.util.common.concurrent.Execs;
+import io.timeandspace.cronscheduler.CronScheduler;
+import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.time.Duration;
import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
public class MonitorSchedulerTest
{
+
+ @Mock
+ private CronScheduler cronScheduler;
+
+ @Before
+ public void setUp()
+ {
+ MockitoAnnotations.initMocks(this);
+ }
+
@Test
public void testFindMonitor()
{
@@ -45,12 +69,15 @@ public class MonitorSchedulerTest
final Monitor1 monitor1 = new Monitor1();
final Monitor2 monitor2 = new Monitor2();
+
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
final MonitorScheduler scheduler = new MonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
- Execs.scheduledSingleThreaded("monitor-scheduler-test"),
+ CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
Mockito.mock(ServiceEmitter.class),
- ImmutableList.of(monitor1, monitor2)
+ ImmutableList.of(monitor1, monitor2),
+ executor
);
final Optional maybeFound1 = scheduler.findMonitor(Monitor1.class);
@@ -62,7 +89,264 @@ public class MonitorSchedulerTest
Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
}
+
+ @Test
+ public void testStart_RepeatScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+ Mockito.doAnswer(new Answer>()
+ {
+ private int scheduleCount = 0;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @Override
+ public Future answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.TRUE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+ Monitor monitor = Mockito.mock(Monitor.class);
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
+
+ }
+
+ @Test
+ public void testStart_RepeatAndStopScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+
+ Mockito.doAnswer(new Answer>()
+ {
+ private int scheduleCount = 0;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+ Mockito.doAnswer(new Answer>()
+ {
+ @Override
+ public Future answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.FALSE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+ while (scheduleCount < 2) {
+ scheduleCount++;
+ task.run(123L);
+ }
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+ Mockito.verify(monitor, Mockito.times(1)).stop();
+
+ }
+
+ @Test
+ public void testStart_UnexpectedExceptionWhileMonitoring()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+ Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException());
+
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @Override
+ public Future answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[0];
+ ((Callable) originalArgument).call();
+ return CompletableFuture.completedFuture(Boolean.TRUE);
+ }
+ }).when(executor).submit(ArgumentMatchers.any(Callable.class));
+
+ task.run(123L);
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
+ }
+
+
+ @Test
+ public void testStart_UnexpectedExceptionWhileScheduling()
+ {
+ ExecutorService executor = Mockito.mock(ExecutorService.class);
+ Monitor monitor = Mockito.mock(Monitor.class);
+ Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
+ MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
+ Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
+
+
+ Mockito.doAnswer(new Answer>()
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future> answer(InvocationOnMock invocation) throws Exception
+ {
+ final Object originalArgument = (invocation.getArguments())[3];
+ CronTask task = ((CronTask) originalArgument);
+
+ Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException());
+ task.run(123L);
+ return createDummyFuture();
+ }
+ }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+
+
+ final MonitorScheduler scheduler = new MonitorScheduler(
+ config,
+ cronScheduler,
+ Mockito.mock(ServiceEmitter.class),
+ ImmutableList.of(monitor),
+ executor);
+ scheduler.start();
+
+ Mockito.verify(monitor, Mockito.times(1)).start();
+ Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
+ ArgumentMatchers.anyLong(),
+ ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
+ Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
+ }
+
+
+ private Future createDummyFuture()
+ {
+ Future> future = new Future()
+ {
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return false;
+ }
+
+ @Override
+ public Object get()
+ {
+ return null;
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit)
+ {
+ return null;
+ }
+
+ };
+
+ return future;
+ }
+
+
private static class NoopMonitor implements Monitor
{
@Override
@@ -82,5 +366,19 @@ public class MonitorSchedulerTest
{
return true;
}
+
+ @Override
+ public Future> getScheduledFuture()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setScheduledFuture(Future> scheduledFuture)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
diff --git a/licenses.yaml b/licenses.yaml
index 170b3d147fd..e684073d81d 100644
--- a/licenses.yaml
+++ b/licenses.yaml
@@ -345,6 +345,16 @@ libraries:
---
+name: CronScheduler
+license_category: binary
+module: java-core
+license_name: Apache License version 2.0
+version: 0.1
+libraries:
+ - io.timeandspace: cron-scheduler
+
+---
+
name: LMAX Disruptor
license_category: binary
module: java-core
diff --git a/pom.xml b/pom.xml
index 383de94be67..ffe9460df06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1255,6 +1255,11 @@
1.19.0
test
+
+ io.timeandspace
+ cron-scheduler
+ 0.1
+
diff --git a/server/pom.xml b/server/pom.xml
index 7fa3a488e71..caddf483bdc 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -311,6 +311,10 @@
io.github.resilience4j
resilience4j-bulkhead
+
+ io.timeandspace
+ cron-scheduler
+
diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
index 71b99454125..d7598aeacfd 100644
--- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java
@@ -27,6 +27,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
+import io.timeandspace.cronscheduler.CronScheduler;
import org.apache.druid.guice.DruidBinders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -42,6 +43,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.java.util.metrics.SysMonitor;
import org.apache.druid.query.ExecutorServiceMonitor;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -106,9 +108,10 @@ public class MetricsModule implements Module
return new MonitorScheduler(
config.get(),
- Execs.scheduledSingleThreaded("MonitorScheduler-%s"),
+ CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
emitter,
- monitors
+ monitors,
+ Execs.multiThreaded(64, "MonitorThread-%d")
);
}