From d0c2ede50c94bef89b72261fe3e553c60e7f0013 Mon Sep 17 00:00:00 2001 From: Ayush Kulshrestha Date: Wed, 25 Nov 2020 17:01:38 +0530 Subject: [PATCH] Added CronScheduler support as a proof to clock drift while emitting metrics (#10448) Co-authored-by: Ayush Kulshrestha --- core/pom.xml | 4 + .../java/util/metrics/AbstractMonitor.java | 17 + .../java/util/metrics/CompoundMonitor.java | 15 + .../druid/java/util/metrics/Monitor.java | 7 + .../java/util/metrics/MonitorScheduler.java | 75 +++-- .../util/metrics/MonitorSchedulerTest.java | 304 +++++++++++++++++- licenses.yaml | 10 + pom.xml | 5 + server/pom.xml | 4 + .../druid/server/metrics/MetricsModule.java | 7 +- 10 files changed, 423 insertions(+), 25 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index e0ccfa1a04b..72fea546892 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -224,6 +224,10 @@ org.antlr antlr4-runtime + + io.timeandspace + cron-scheduler + diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java index 029dd478000..4fbefb88e5c 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java @@ -22,11 +22,16 @@ package org.apache.druid.java.util.metrics; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import java.util.concurrent.Future; + + /** */ public abstract class AbstractMonitor implements Monitor { private volatile boolean started = false; + + private volatile Future scheduledFuture; @Override public void start() @@ -51,4 +56,16 @@ public abstract class AbstractMonitor implements Monitor } public abstract boolean doMonitor(ServiceEmitter emitter); + + @Override + public Future getScheduledFuture() + { + return scheduledFuture; + } + + @Override + public void setScheduledFuture(Future scheduledFuture) + { + this.scheduledFuture = scheduledFuture; + } } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java index 9811f584bc0..6649312d4a2 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java @@ -24,10 +24,13 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Future; public abstract class CompoundMonitor implements Monitor { private final List monitors; + + private volatile Future scheduledFuture; public CompoundMonitor(List monitors) { @@ -61,5 +64,17 @@ public abstract class CompoundMonitor implements Monitor return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter))); } + @Override + public Future getScheduledFuture() + { + return scheduledFuture; + } + + @Override + public void setScheduledFuture(Future scheduledFuture) + { + this.scheduledFuture = scheduledFuture; + } + public abstract boolean shouldReschedule(List reschedules); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java index 2ccd5db3ca6..8a3975e57e2 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java @@ -21,6 +21,9 @@ package org.apache.druid.java.util.metrics; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import java.util.concurrent.Future; + + /** */ public interface Monitor @@ -35,4 +38,8 @@ public interface Monitor * @return true if this monitor needs to continue monitoring. False otherwise. */ boolean monitor(ServiceEmitter emitter); + + Future getScheduledFuture(); + + void setScheduledFuture(Future scheduledFuture); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 2adbe9510a3..961f8239482 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -20,41 +20,52 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.Sets; +import io.timeandspace.cronscheduler.CronScheduler; +import io.timeandspace.cronscheduler.CronTask; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** */ public class MonitorScheduler { + + private static final Logger log = new Logger(MonitorScheduler.class); + private final MonitorSchedulerConfig config; - private final ScheduledExecutorService exec; private final ServiceEmitter emitter; private final Set monitors; private final Object lock = new Object(); + private final CronScheduler scheduler; + private final ExecutorService executorService; private volatile boolean started = false; - + public MonitorScheduler( MonitorSchedulerConfig config, - ScheduledExecutorService exec, + CronScheduler scheduler, ServiceEmitter emitter, - List monitors + List monitors, + ExecutorService executorService ) { this.config = config; - this.exec = exec; + this.scheduler = scheduler; this.emitter = emitter; this.monitors = Sets.newHashSet(monitors); + this.executorService = executorService; } @LifecycleStart @@ -124,24 +135,47 @@ public class MonitorScheduler { synchronized (lock) { monitor.start(); - ScheduledExecutors.scheduleAtFixedRate( - exec, - config.getEmitterPeriod(), - new Callable() + long rate = config.getEmitterPeriod().getMillis(); + Future scheduledFuture = scheduler.scheduleAtFixedRate( + rate, + rate, + TimeUnit.MILLISECONDS, + new CronTask() { + private volatile Future monitorFuture = null; @Override - public ScheduledExecutors.Signal call() + public void run(long scheduledRunTimeMillis) { - // Run one more time even if the monitor was removed, in case there's some extra data to flush - if (monitor.monitor(emitter) && hasMonitor(monitor)) { - return ScheduledExecutors.Signal.REPEAT; - } else { - removeMonitor(monitor); - return ScheduledExecutors.Signal.STOP; + try { + if (monitorFuture != null && monitorFuture.isDone() + && !(monitorFuture.get() && hasMonitor(monitor))) { + removeMonitor(monitor); + monitor.getScheduledFuture().cancel(false); + log.debug("Stopped rescheduling %s (delay %s)", this, rate); + return; + } + log.trace("Running %s (period %s)", this, rate); + monitorFuture = executorService.submit(new Callable() + { + @Override + public Boolean call() + { + try { + return monitor.monitor(emitter); + } + catch (Throwable e) { + log.error(e, "Uncaught exception."); + return Boolean.FALSE; + } + } + }); + } + catch (Throwable e) { + log.error(e, "Uncaught exception."); } } - } - ); + }); + monitor.setScheduledFuture(scheduledFuture); } } @@ -151,4 +185,5 @@ public class MonitorScheduler return monitors.contains(monitor); } } + } diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java index 76671968b91..da2ba5926f5 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java @@ -19,17 +19,41 @@ package org.apache.druid.java.util.metrics; + import com.google.common.collect.ImmutableList; -import org.apache.druid.java.util.common.concurrent.Execs; +import io.timeandspace.cronscheduler.CronScheduler; +import io.timeandspace.cronscheduler.CronTask; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import java.time.Duration; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class MonitorSchedulerTest { + + @Mock + private CronScheduler cronScheduler; + + @Before + public void setUp() + { + MockitoAnnotations.initMocks(this); + } + @Test public void testFindMonitor() { @@ -45,12 +69,15 @@ public class MonitorSchedulerTest final Monitor1 monitor1 = new Monitor1(); final Monitor2 monitor2 = new Monitor2(); + + ExecutorService executor = Mockito.mock(ExecutorService.class); final MonitorScheduler scheduler = new MonitorScheduler( Mockito.mock(MonitorSchedulerConfig.class), - Execs.scheduledSingleThreaded("monitor-scheduler-test"), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(), Mockito.mock(ServiceEmitter.class), - ImmutableList.of(monitor1, monitor2) + ImmutableList.of(monitor1, monitor2), + executor ); final Optional maybeFound1 = scheduler.findMonitor(Monitor1.class); @@ -62,7 +89,264 @@ public class MonitorSchedulerTest Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent()); } + + @Test + public void testStart_RepeatScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + Mockito.doAnswer(new Answer>() + { + private int scheduleCount = 0; + + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.doAnswer(new Answer>() + { + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.TRUE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + Monitor monitor = Mockito.mock(Monitor.class); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any()); + + } + + @Test + public void testStart_RepeatAndStopScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + + Mockito.doAnswer(new Answer>() + { + private int scheduleCount = 0; + + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + Mockito.doAnswer(new Answer>() + { + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.FALSE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); + Mockito.verify(monitor, Mockito.times(1)).stop(); + + } + + @Test + public void testStart_UnexpectedExceptionWhileMonitoring() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException()); + + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + + Mockito.doAnswer(new Answer>() + { + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.doAnswer(new Answer>() + { + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[0]; + ((Callable) originalArgument).call(); + return CompletableFuture.completedFuture(Boolean.TRUE); + } + }).when(executor).submit(ArgumentMatchers.any(Callable.class)); + + task.run(123L); + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); + } + + + @Test + public void testStart_UnexpectedExceptionWhileScheduling() + { + ExecutorService executor = Mockito.mock(ExecutorService.class); + Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); + MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); + Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); + + + Mockito.doAnswer(new Answer>() + { + @SuppressWarnings("unchecked") + @Override + public Future answer(InvocationOnMock invocation) throws Exception + { + final Object originalArgument = (invocation.getArguments())[3]; + CronTask task = ((CronTask) originalArgument); + + Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException()); + task.run(123L); + return createDummyFuture(); + } + }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + + + final MonitorScheduler scheduler = new MonitorScheduler( + config, + cronScheduler, + Mockito.mock(ServiceEmitter.class), + ImmutableList.of(monitor), + executor); + scheduler.start(); + + Mockito.verify(monitor, Mockito.times(1)).start(); + Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), + ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); + Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + } + + + private Future createDummyFuture() + { + Future future = new Future() + { + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return false; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return false; + } + + @Override + public Object get() + { + return null; + } + + @Override + public Object get(long timeout, TimeUnit unit) + { + return null; + } + + }; + + return future; + } + + private static class NoopMonitor implements Monitor { @Override @@ -82,5 +366,19 @@ public class MonitorSchedulerTest { return true; } + + @Override + public Future getScheduledFuture() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setScheduledFuture(Future scheduledFuture) + { + // TODO Auto-generated method stub + + } } } diff --git a/licenses.yaml b/licenses.yaml index 170b3d147fd..e684073d81d 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -345,6 +345,16 @@ libraries: --- +name: CronScheduler +license_category: binary +module: java-core +license_name: Apache License version 2.0 +version: 0.1 +libraries: + - io.timeandspace: cron-scheduler + +--- + name: LMAX Disruptor license_category: binary module: java-core diff --git a/pom.xml b/pom.xml index 383de94be67..ffe9460df06 100644 --- a/pom.xml +++ b/pom.xml @@ -1255,6 +1255,11 @@ 1.19.0 test + + io.timeandspace + cron-scheduler + 0.1 + diff --git a/server/pom.xml b/server/pom.xml index 7fa3a488e71..caddf483bdc 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -311,6 +311,10 @@ io.github.resilience4j resilience4j-bulkhead + + io.timeandspace + cron-scheduler + diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index 71b99454125..d7598aeacfd 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -27,6 +27,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.name.Names; +import io.timeandspace.cronscheduler.CronScheduler; import org.apache.druid.guice.DruidBinders; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -42,6 +43,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.java.util.metrics.SysMonitor; import org.apache.druid.query.ExecutorServiceMonitor; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -106,9 +108,10 @@ public class MetricsModule implements Module return new MonitorScheduler( config.get(), - Execs.scheduledSingleThreaded("MonitorScheduler-%s"), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(), emitter, - monitors + monitors, + Execs.multiThreaded(64, "MonitorThread-%d") ); }