diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java index 4fbefb88e5c..ba35dfb95c0 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/AbstractMonitor.java @@ -22,8 +22,6 @@ package org.apache.druid.java.util.metrics; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import java.util.concurrent.Future; - /** */ @@ -31,8 +29,6 @@ public abstract class AbstractMonitor implements Monitor { private volatile boolean started = false; - private volatile Future scheduledFuture; - @Override public void start() { @@ -56,16 +52,4 @@ public abstract class AbstractMonitor implements Monitor } public abstract boolean doMonitor(ServiceEmitter emitter); - - @Override - public Future getScheduledFuture() - { - return scheduledFuture; - } - - @Override - public void setScheduledFuture(Future scheduledFuture) - { - this.scheduledFuture = scheduledFuture; - } } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java new file mode 100644 index 00000000000..e3e9572e573 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/metrics/BasicMonitorScheduler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors.Signal; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +/** + * A {@link MonitorScheduler} implementation based on {@link ScheduledExecutorService}. + */ +public class BasicMonitorScheduler extends MonitorScheduler +{ + private final ScheduledExecutorService exec; + + public BasicMonitorScheduler( + MonitorSchedulerConfig config, + ServiceEmitter emitter, + List monitors, + ScheduledExecutorService exec + ) + { + super(config, emitter, monitors); + this.exec = exec; + } + + @Override + void startMonitor(Monitor monitor) + { + monitor.start(); + ScheduledExecutors.scheduleAtFixedRate( + exec, + getConfig().getEmitterPeriod(), + () -> { + // Run one more time even if the monitor was removed, in case there's some extra data to flush + if (monitor.monitor(getEmitter()) && hasMonitor(monitor)) { + return Signal.REPEAT; + } else { + removeMonitor(monitor); + return Signal.STOP; + } + } + ); + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java new file mode 100644 index 00000000000..6fcb2440697 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorScheduler.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import io.timeandspace.cronscheduler.CronScheduler; +import io.timeandspace.cronscheduler.CronTask; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link MonitorScheduler} implementation based on {@link CronScheduler}. + */ +public class ClockDriftSafeMonitorScheduler extends MonitorScheduler +{ + private static final Logger LOG = new Logger(ClockDriftSafeMonitorScheduler.class); + + private final CronScheduler monitorScheduler; + private final ExecutorService monitorRunner; + + public ClockDriftSafeMonitorScheduler( + MonitorSchedulerConfig config, + ServiceEmitter emitter, + List monitors, + CronScheduler monitorScheduler, + ExecutorService monitorRunner + ) + { + super(config, emitter, monitors); + this.monitorScheduler = monitorScheduler; + this.monitorRunner = monitorRunner; + } + + @Override + void startMonitor(final Monitor monitor) + { + monitor.start(); + long rate = getConfig().getEmitterPeriod().getMillis(); + final AtomicReference> futureReference = new AtomicReference<>(); + Future future = monitorScheduler.scheduleAtFixedRate( + rate, + rate, + TimeUnit.MILLISECONDS, + new CronTask() + { + private Future cancellationFuture = null; + private Future monitorFuture = null; + + @Override + public void run(long scheduledRunTimeMillis) + { + waitForScheduleFutureToBeSet(); + if (cancellationFuture == null) { + LOG.error("scheduleFuture is not set. Can't run monitor[%s]", monitor.getClass().getName()); + return; + } + try { + // Do nothing if the monitor is still running. + if (monitorFuture == null || monitorFuture.isDone()) { + if (monitorFuture != null) { + // monitorFuture must be done at this moment if it's not null + if (!(monitorFuture.get() && hasMonitor(monitor))) { + stopMonitor(monitor); + return; + } + } + + LOG.trace("Running monitor[%s]", monitor.getClass().getName()); + monitorFuture = monitorRunner.submit(() -> { + try { + return monitor.monitor(getEmitter()); + } + catch (Throwable e) { + LOG.error( + e, + "Exception while executing monitor[%s]. Rescheduling in %s ms", + monitor.getClass().getName(), + rate + ); + return Boolean.TRUE; + } + }); + } + } + catch (Throwable e) { + LOG.error(e, "Uncaught exception."); + } + } + + private void waitForScheduleFutureToBeSet() + { + if (cancellationFuture == null) { + while (!Thread.currentThread().isInterrupted()) { + if (futureReference.get() != null) { + cancellationFuture = futureReference.get(); + break; + } + } + } + } + + private void stopMonitor(Monitor monitor) + { + removeMonitor(monitor); + cancellationFuture.cancel(false); + LOG.debug("Stopped monitor[%s]", monitor.getClass().getName()); + } + } + ); + futureReference.set(future); + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java index 6649312d4a2..676c77a772f 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/CompoundMonitor.java @@ -24,14 +24,11 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Future; public abstract class CompoundMonitor implements Monitor { private final List monitors; - private volatile Future scheduledFuture; - public CompoundMonitor(List monitors) { this.monitors = monitors; @@ -64,17 +61,5 @@ public abstract class CompoundMonitor implements Monitor return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter))); } - @Override - public Future getScheduledFuture() - { - return scheduledFuture; - } - - @Override - public void setScheduledFuture(Future scheduledFuture) - { - this.scheduledFuture = scheduledFuture; - } - public abstract boolean shouldReschedule(List reschedules); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java index 8a3975e57e2..8ddb3fa8301 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java @@ -21,8 +21,6 @@ package org.apache.druid.java.util.metrics; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import java.util.concurrent.Future; - /** */ @@ -38,8 +36,4 @@ public interface Monitor * @return true if this monitor needs to continue monitoring. False otherwise. */ boolean monitor(ServiceEmitter emitter); - - Future getScheduledFuture(); - - void setScheduledFuture(Future scheduledFuture); } diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java index 961f8239482..171ca40a0fc 100644 --- a/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java +++ b/core/src/main/java/org/apache/druid/java/util/metrics/MonitorScheduler.java @@ -20,52 +20,37 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.Sets; -import io.timeandspace.cronscheduler.CronScheduler; -import io.timeandspace.cronscheduler.CronTask; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; /** */ -public class MonitorScheduler +public abstract class MonitorScheduler { - - private static final Logger log = new Logger(MonitorScheduler.class); - private final MonitorSchedulerConfig config; private final ServiceEmitter emitter; private final Set monitors; private final Object lock = new Object(); - private final CronScheduler scheduler; - private final ExecutorService executorService; private volatile boolean started = false; - public MonitorScheduler( + MonitorScheduler( MonitorSchedulerConfig config, - CronScheduler scheduler, ServiceEmitter emitter, - List monitors, - ExecutorService executorService + List monitors ) { this.config = config; - this.scheduler = scheduler; this.emitter = emitter; this.monitors = Sets.newHashSet(monitors); - this.executorService = executorService; } @LifecycleStart @@ -131,59 +116,23 @@ public class MonitorScheduler } } - private void startMonitor(final Monitor monitor) - { - synchronized (lock) { - monitor.start(); - long rate = config.getEmitterPeriod().getMillis(); - Future scheduledFuture = scheduler.scheduleAtFixedRate( - rate, - rate, - TimeUnit.MILLISECONDS, - new CronTask() - { - private volatile Future monitorFuture = null; - @Override - public void run(long scheduledRunTimeMillis) - { - try { - if (monitorFuture != null && monitorFuture.isDone() - && !(monitorFuture.get() && hasMonitor(monitor))) { - removeMonitor(monitor); - monitor.getScheduledFuture().cancel(false); - log.debug("Stopped rescheduling %s (delay %s)", this, rate); - return; - } - log.trace("Running %s (period %s)", this, rate); - monitorFuture = executorService.submit(new Callable() - { - @Override - public Boolean call() - { - try { - return monitor.monitor(emitter); - } - catch (Throwable e) { - log.error(e, "Uncaught exception."); - return Boolean.FALSE; - } - } - }); - } - catch (Throwable e) { - log.error(e, "Uncaught exception."); - } - } - }); - monitor.setScheduledFuture(scheduledFuture); - } - } - - private boolean hasMonitor(final Monitor monitor) + boolean hasMonitor(final Monitor monitor) { synchronized (lock) { return monitors.contains(monitor); } } - + + MonitorSchedulerConfig getConfig() + { + return config; + } + + ServiceEmitter getEmitter() + { + return emitter; + } + + @GuardedBy("lock") + abstract void startMonitor(Monitor monitor); } diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java new file mode 100644 index 00000000000..ea0408050ef --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; + +public class BasicMonitorSchedulerTest +{ + private final MonitorSchedulerConfig config = new MonitorSchedulerConfig() + { + @Override + public Duration getEmitterPeriod() + { + return Duration.millis(5); + } + }; + private ServiceEmitter emitter; + private ScheduledExecutorService exec; + + @Before + public void setup() + { + emitter = Mockito.mock(ServiceEmitter.class); + exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest"); + } + + @Test + public void testStart_RepeatScheduling() throws InterruptedException + { + final Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true); + + final BasicMonitorScheduler scheduler = new BasicMonitorScheduler( + config, + emitter, + ImmutableList.of(monitor), + exec + ); + scheduler.start(); + Thread.sleep(100); + Mockito.verify(monitor, Mockito.atLeast(2)).monitor(ArgumentMatchers.any()); + scheduler.stop(); + } + + @Test + public void testStart_RepeatAndStopScheduling() throws InterruptedException + { + final Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true, true, true, false); + + final BasicMonitorScheduler scheduler = new BasicMonitorScheduler( + config, + emitter, + ImmutableList.of(monitor), + exec + ); + scheduler.start(); + Thread.sleep(100); + // monitor.monitor() is called 5 times since a new task is scheduled first and then the current one is executed. + // See ScheduledExecutors.scheduleAtFixedRate() for details. + Mockito.verify(monitor, Mockito.times(5)).monitor(ArgumentMatchers.any()); + scheduler.stop(); + } + + @Test + public void testStart_UnexpectedExceptionWhileMonitoring_ContinueMonitor() throws InterruptedException + { + final Monitor monitor = Mockito.mock(Monitor.class); + Mockito.when(monitor.monitor(ArgumentMatchers.any())) + .thenThrow(new RuntimeException("Test throwing exception while monitoring")); + + final BasicMonitorScheduler scheduler = new BasicMonitorScheduler( + config, + emitter, + ImmutableList.of(monitor), + exec + ); + scheduler.start(); + Thread.sleep(100); + // monitor.monitor() is called 5 times since a new task is scheduled first and then the current one is executed. + // See ScheduledExecutors.scheduleAtFixedRate() for details. + Mockito.verify(monitor, Mockito.atLeast(2)).monitor(ArgumentMatchers.any()); + scheduler.stop(); + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java similarity index 73% rename from core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java rename to core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java index da2ba5926f5..b5e80a2ea6c 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/MonitorSchedulerTest.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/ClockDriftSafeMonitorSchedulerTest.java @@ -23,7 +23,9 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.ImmutableList; import io.timeandspace.cronscheduler.CronScheduler; import io.timeandspace.cronscheduler.CronTask; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -38,21 +40,37 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -public class MonitorSchedulerTest +public class ClockDriftSafeMonitorSchedulerTest { - + // A real executor service to execute CronTask asynchronously. + // Many tests in this class use mocks to easily control the behavior of CronScheduler and the ExecutorService + // used by MonitorScheduler. However, as MonitorScheduler uses two differnt threads in production, one for + // scheduling a task to schedule a monitor (CronScheduler), and another for running a scheduled monitor + // asynchronously, these tests also require to run some tasks in an asynchronous manner. As mocks are convenient + // enough to control the behavior of things, we use another executorService only to run some tasks asynchronously + // to mimic the nature of asynchronous execution in MonitorScheduler. + private ExecutorService cronTaskRunner; @Mock private CronScheduler cronScheduler; @Before public void setUp() { + cronTaskRunner = Execs.singleThreaded("monitor-scheduler-test"); MockitoAnnotations.initMocks(this); } + + @After + public void tearDown() + { + cronTaskRunner.shutdownNow(); + } @Test public void testFindMonitor() @@ -72,11 +90,11 @@ public class MonitorSchedulerTest ExecutorService executor = Mockito.mock(ExecutorService.class); - final MonitorScheduler scheduler = new MonitorScheduler( + final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( Mockito.mock(MonitorSchedulerConfig.class), - CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(), Mockito.mock(ServiceEmitter.class), ImmutableList.of(monitor1, monitor2), + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(), executor ); @@ -91,17 +109,18 @@ public class MonitorSchedulerTest } @Test - public void testStart_RepeatScheduling() + public void testStart_RepeatScheduling() throws InterruptedException { ExecutorService executor = Mockito.mock(ExecutorService.class); + CountDownLatch latch = new CountDownLatch(1); Mockito.doAnswer(new Answer>() { private int scheduleCount = 0; @SuppressWarnings("unchecked") @Override - public Future answer(InvocationOnMock invocation) throws Exception + public Future answer(InvocationOnMock invocation) { final Object originalArgument = (invocation.getArguments())[3]; CronTask task = ((CronTask) originalArgument); @@ -117,10 +136,14 @@ public class MonitorSchedulerTest } }).when(executor).submit(ArgumentMatchers.any(Callable.class)); - while (scheduleCount < 2) { - scheduleCount++; - task.run(123L); - } + cronTaskRunner.submit(() -> { + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + latch.countDown(); + return null; + }); return createDummyFuture(); } }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), @@ -131,13 +154,15 @@ public class MonitorSchedulerTest MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); - final MonitorScheduler scheduler = new MonitorScheduler( + final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( config, - cronScheduler, Mockito.mock(ServiceEmitter.class), ImmutableList.of(monitor), - executor); + cronScheduler, + executor + ); scheduler.start(); + latch.await(5, TimeUnit.SECONDS); Mockito.verify(monitor, Mockito.times(1)).start(); Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), @@ -145,21 +170,22 @@ public class MonitorSchedulerTest ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class)); Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any()); - + scheduler.stop(); } @Test - public void testStart_RepeatAndStopScheduling() + public void testStart_RepeatAndStopScheduling() throws InterruptedException { ExecutorService executor = Mockito.mock(ExecutorService.class); + CountDownLatch latch = new CountDownLatch(1); Mockito.doAnswer(new Answer>() { private int scheduleCount = 0; @SuppressWarnings("unchecked") @Override - public Future answer(InvocationOnMock invocation) throws Exception + public Future answer(InvocationOnMock invocation) { final Object originalArgument = (invocation.getArguments())[3]; CronTask task = ((CronTask) originalArgument); @@ -174,29 +200,34 @@ public class MonitorSchedulerTest } }).when(executor).submit(ArgumentMatchers.any(Callable.class)); - while (scheduleCount < 2) { - scheduleCount++; - task.run(123L); - } + cronTaskRunner.submit(() -> { + while (scheduleCount < 2) { + scheduleCount++; + task.run(123L); + } + latch.countDown(); + return null; + }); return createDummyFuture(); } }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); Monitor monitor = Mockito.mock(Monitor.class); - Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); - final MonitorScheduler scheduler = new MonitorScheduler( + final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( config, - cronScheduler, Mockito.mock(ServiceEmitter.class), ImmutableList.of(monitor), - executor); + cronScheduler, + executor + ); scheduler.start(); - + latch.await(5, TimeUnit.SECONDS); + Mockito.verify(monitor, Mockito.times(1)).start(); Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), @@ -204,26 +235,27 @@ public class MonitorSchedulerTest Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); Mockito.verify(monitor, Mockito.times(1)).stop(); - + scheduler.stop(); } @Test - public void testStart_UnexpectedExceptionWhileMonitoring() + public void testStart_UnexpectedExceptionWhileMonitoring() throws InterruptedException { ExecutorService executor = Mockito.mock(ExecutorService.class); Monitor monitor = Mockito.mock(Monitor.class); - Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); - Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException()); + Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))) + .thenThrow(new RuntimeException("Test throwing exception while monitoring")); MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); - + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean monitorResultHolder = new AtomicBoolean(false); Mockito.doAnswer(new Answer>() { @SuppressWarnings("unchecked") @Override - public Future answer(InvocationOnMock invocation) throws Exception + public Future answer(InvocationOnMock invocation) { final Object originalArgument = (invocation.getArguments())[3]; CronTask task = ((CronTask) originalArgument); @@ -234,78 +266,92 @@ public class MonitorSchedulerTest public Future answer(InvocationOnMock invocation) throws Exception { final Object originalArgument = (invocation.getArguments())[0]; - ((Callable) originalArgument).call(); - return CompletableFuture.completedFuture(Boolean.TRUE); + final boolean continueMonitor = ((Callable) originalArgument).call(); + monitorResultHolder.set(continueMonitor); + return CompletableFuture.completedFuture(continueMonitor); } }).when(executor).submit(ArgumentMatchers.any(Callable.class)); - task.run(123L); + cronTaskRunner.submit(() -> { + task.run(123L); + latch.countDown(); + return null; + }); return createDummyFuture(); } }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); - final MonitorScheduler scheduler = new MonitorScheduler( + final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( config, - cronScheduler, Mockito.mock(ServiceEmitter.class), ImmutableList.of(monitor), - executor); + cronScheduler, + executor + ); scheduler.start(); - + latch.await(5, TimeUnit.SECONDS); + Mockito.verify(monitor, Mockito.times(1)).start(); Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any()); + Assert.assertTrue(monitorResultHolder.get()); + scheduler.stop(); } - @Test - public void testStart_UnexpectedExceptionWhileScheduling() + public void testStart_UnexpectedExceptionWhileScheduling() throws InterruptedException { ExecutorService executor = Mockito.mock(ExecutorService.class); Monitor monitor = Mockito.mock(Monitor.class); - Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture()); MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class); Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L)); - + CountDownLatch latch = new CountDownLatch(1); Mockito.doAnswer(new Answer>() { @SuppressWarnings("unchecked") @Override - public Future answer(InvocationOnMock invocation) throws Exception + public Future answer(InvocationOnMock invocation) { final Object originalArgument = (invocation.getArguments())[3]; CronTask task = ((CronTask) originalArgument); - Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException()); - task.run(123L); + Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))) + .thenThrow(new RuntimeException("Test throwing exception while scheduling")); + cronTaskRunner.submit(() -> { + task.run(123L); + latch.countDown(); + return null; + }); return createDummyFuture(); } }).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); - final MonitorScheduler scheduler = new MonitorScheduler( + final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler( config, - cronScheduler, Mockito.mock(ServiceEmitter.class), ImmutableList.of(monitor), - executor); + cronScheduler, + executor + ); scheduler.start(); - + latch.await(5, TimeUnit.SECONDS); + Mockito.verify(monitor, Mockito.times(1)).start(); Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class)); Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class)); + scheduler.stop(); } - private Future createDummyFuture() { Future future = new Future() @@ -366,19 +412,5 @@ public class MonitorSchedulerTest { return true; } - - @Override - public Future getScheduledFuture() - { - // TODO Auto-generated method stub - return null; - } - - @Override - public void setScheduledFuture(Future scheduledFuture) - { - // TODO Auto-generated method stub - - } } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java b/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java index 228360f9bc2..0e242b1244c 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java +++ b/server/src/main/java/org/apache/druid/server/metrics/DruidMonitorSchedulerConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.server.metrics; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.metrics.BasicMonitorScheduler; import org.apache.druid.java.util.metrics.MonitorSchedulerConfig; import org.joda.time.Duration; import org.joda.time.Period; @@ -28,9 +29,17 @@ import org.joda.time.Period; */ public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig { + @JsonProperty + private String schedulerClassName = BasicMonitorScheduler.class.getName(); + @JsonProperty private Period emissionPeriod = new Period("PT1M"); + public String getSchedulerClassName() + { + return schedulerClassName; + } + @JsonProperty public Period getEmissionPeriod() { diff --git a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java index d7598aeacfd..df6fe8bc8d0 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java +++ b/server/src/main/java/org/apache/druid/server/metrics/MetricsModule.java @@ -32,9 +32,12 @@ import org.apache.druid.guice.DruidBinders; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.BasicMonitorScheduler; +import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler; import org.apache.druid.java.util.metrics.JvmCpuMonitor; import org.apache.druid.java.util.metrics.JvmMonitor; import org.apache.druid.java.util.metrics.JvmThreadsMonitor; @@ -56,6 +59,7 @@ import java.util.stream.Collectors; */ public class MetricsModule implements Module { + static final String MONITORING_PROPERTY_PREFIX = "druid.monitoring"; private static final Logger log = new Logger(MetricsModule.class); public static void register(Binder binder, Class monitorClazz) @@ -66,8 +70,8 @@ public class MetricsModule implements Module @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.monitoring", DruidMonitorSchedulerConfig.class); - JsonConfigProvider.bind(binder, "druid.monitoring", MonitorsConfig.class); + JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX, DruidMonitorSchedulerConfig.class); + JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX, MonitorsConfig.class); DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum. @@ -106,13 +110,24 @@ public class MetricsModule implements Module ); } - return new MonitorScheduler( - config.get(), - CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(), - emitter, - monitors, - Execs.multiThreaded(64, "MonitorThread-%d") - ); + if (ClockDriftSafeMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName())) { + return new ClockDriftSafeMonitorScheduler( + config.get(), + emitter, + monitors, + CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler").build(), + Execs.singleThreaded("MonitorRunner") + ); + } else if (BasicMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName())) { + return new BasicMonitorScheduler( + config.get(), + emitter, + monitors, + Execs.scheduledSingleThreaded("MonitorScheduler-%s") + ); + } else { + throw new IAE("Unknown monitor scheduler[%s]", config.get().getSchedulerClassName()); + } } @Provides diff --git a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java index a94d6cc9370..cba4f84dbd6 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/MetricsModuleTest.java @@ -21,20 +21,41 @@ package org.apache.druid.server.metrics; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.CreationException; +import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Scopes; import com.google.inject.name.Names; import org.apache.druid.guice.GuiceInjectors; import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.guice.LazySingleton; +import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; +import org.apache.druid.jackson.JacksonModule; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.BasicMonitorScheduler; +import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler; +import org.apache.druid.java.util.metrics.MonitorScheduler; import org.apache.druid.server.DruidNode; +import org.hamcrest.CoreMatchers; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; + +import javax.validation.Validation; +import javax.validation.Validator; +import java.util.Properties; public class MetricsModuleTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Test public void testSimpleInjection() { @@ -88,4 +109,64 @@ public class MetricsModuleTest Assert.assertEquals(dataSource, dimensionIdHolder.getDataSource()); Assert.assertEquals(taskId, dimensionIdHolder.getTaskId()); } + + @Test + public void testGetBasicMonitorSchedulerByDefault() + { + final MonitorScheduler monitorScheduler = createInjector(new Properties()).getInstance(MonitorScheduler.class); + Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass()); + } + + @Test + public void testGetClockDriftSafeMonitorSchedulerViaConfig() + { + final Properties properties = new Properties(); + properties.setProperty( + StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX), + ClockDriftSafeMonitorScheduler.class.getName() + ); + final MonitorScheduler monitorScheduler = createInjector(properties).getInstance(MonitorScheduler.class); + Assert.assertSame(ClockDriftSafeMonitorScheduler.class, monitorScheduler.getClass()); + } + + @Test + public void testGetBasicMonitorSchedulerViaConfig() + { + final Properties properties = new Properties(); + properties.setProperty( + StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX), + BasicMonitorScheduler.class.getName() + ); + final MonitorScheduler monitorScheduler = createInjector(properties).getInstance(MonitorScheduler.class); + Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass()); + } + + @Test + public void testGetMonitorSchedulerUnknownSchedulerException() + { + final Properties properties = new Properties(); + properties.setProperty( + StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX), + "UnknownScheduler" + ); + expectedException.expect(CreationException.class); + expectedException.expectCause(CoreMatchers.instanceOf(IllegalArgumentException.class)); + expectedException.expectMessage("Unknown monitor scheduler[UnknownScheduler]"); + createInjector(properties).getInstance(MonitorScheduler.class); + } + + private static Injector createInjector(Properties properties) + { + return Guice.createInjector( + new JacksonModule(), + new LifecycleModule(), + binder -> { + binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); + binder.bindScope(LazySingleton.class, Scopes.SINGLETON); + binder.bind(ServiceEmitter.class).toInstance(new NoopServiceEmitter()); + binder.bind(Properties.class).toInstance(properties); + }, + new MetricsModule() + ); + } }