Add a config for monitorScheduler type (#10732)

* Add a config for monitorScheduler type

* check interrupted

* null check

* do not schedule monitor if the previous one is still running

* checkstyle

* clean up names

* change default back to basic

* fix test
This commit is contained in:
Jihoon Son 2021-01-13 17:20:43 -08:00 committed by GitHub
parent 149306c9db
commit b3325c1601
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 538 additions and 179 deletions

View File

@ -22,8 +22,6 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.concurrent.Future;
/**
*/
@ -31,8 +29,6 @@ public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;
private volatile Future<?> scheduledFuture;
@Override
public void start()
{
@ -56,16 +52,4 @@ public abstract class AbstractMonitor implements Monitor
}
public abstract boolean doMonitor(ServiceEmitter emitter);
@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}
@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors.Signal;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
/**
* A {@link MonitorScheduler} implementation based on {@link ScheduledExecutorService}.
*/
public class BasicMonitorScheduler extends MonitorScheduler
{
private final ScheduledExecutorService exec;
public BasicMonitorScheduler(
MonitorSchedulerConfig config,
ServiceEmitter emitter,
List<Monitor> monitors,
ScheduledExecutorService exec
)
{
super(config, emitter, monitors);
this.exec = exec;
}
@Override
void startMonitor(Monitor monitor)
{
monitor.start();
ScheduledExecutors.scheduleAtFixedRate(
exec,
getConfig().getEmitterPeriod(),
() -> {
// Run one more time even if the monitor was removed, in case there's some extra data to flush
if (monitor.monitor(getEmitter()) && hasMonitor(monitor)) {
return Signal.REPEAT;
} else {
removeMonitor(monitor);
return Signal.STOP;
}
}
);
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* A {@link MonitorScheduler} implementation based on {@link CronScheduler}.
*/
public class ClockDriftSafeMonitorScheduler extends MonitorScheduler
{
private static final Logger LOG = new Logger(ClockDriftSafeMonitorScheduler.class);
private final CronScheduler monitorScheduler;
private final ExecutorService monitorRunner;
public ClockDriftSafeMonitorScheduler(
MonitorSchedulerConfig config,
ServiceEmitter emitter,
List<Monitor> monitors,
CronScheduler monitorScheduler,
ExecutorService monitorRunner
)
{
super(config, emitter, monitors);
this.monitorScheduler = monitorScheduler;
this.monitorRunner = monitorRunner;
}
@Override
void startMonitor(final Monitor monitor)
{
monitor.start();
long rate = getConfig().getEmitterPeriod().getMillis();
final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
Future<?> future = monitorScheduler.scheduleAtFixedRate(
rate,
rate,
TimeUnit.MILLISECONDS,
new CronTask()
{
private Future<?> cancellationFuture = null;
private Future<Boolean> monitorFuture = null;
@Override
public void run(long scheduledRunTimeMillis)
{
waitForScheduleFutureToBeSet();
if (cancellationFuture == null) {
LOG.error("scheduleFuture is not set. Can't run monitor[%s]", monitor.getClass().getName());
return;
}
try {
// Do nothing if the monitor is still running.
if (monitorFuture == null || monitorFuture.isDone()) {
if (monitorFuture != null) {
// monitorFuture must be done at this moment if it's not null
if (!(monitorFuture.get() && hasMonitor(monitor))) {
stopMonitor(monitor);
return;
}
}
LOG.trace("Running monitor[%s]", monitor.getClass().getName());
monitorFuture = monitorRunner.submit(() -> {
try {
return monitor.monitor(getEmitter());
}
catch (Throwable e) {
LOG.error(
e,
"Exception while executing monitor[%s]. Rescheduling in %s ms",
monitor.getClass().getName(),
rate
);
return Boolean.TRUE;
}
});
}
}
catch (Throwable e) {
LOG.error(e, "Uncaught exception.");
}
}
private void waitForScheduleFutureToBeSet()
{
if (cancellationFuture == null) {
while (!Thread.currentThread().isInterrupted()) {
if (futureReference.get() != null) {
cancellationFuture = futureReference.get();
break;
}
}
}
}
private void stopMonitor(Monitor monitor)
{
removeMonitor(monitor);
cancellationFuture.cancel(false);
LOG.debug("Stopped monitor[%s]", monitor.getClass().getName());
}
}
);
futureReference.set(future);
}
}

View File

@ -24,14 +24,11 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
public abstract class CompoundMonitor implements Monitor
{
private final List<Monitor> monitors;
private volatile Future<?> scheduledFuture;
public CompoundMonitor(List<Monitor> monitors)
{
this.monitors = monitors;
@ -64,17 +61,5 @@ public abstract class CompoundMonitor implements Monitor
return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter)));
}
@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}
@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}
public abstract boolean shouldReschedule(List<Boolean> reschedules);
}

View File

@ -21,8 +21,6 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.concurrent.Future;
/**
*/
@ -38,8 +36,4 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);
Future<?> getScheduledFuture();
void setScheduledFuture(Future<?> scheduledFuture);
}

View File

@ -20,52 +20,37 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.Sets;
import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
*/
public class MonitorScheduler
public abstract class MonitorScheduler
{
private static final Logger log = new Logger(MonitorScheduler.class);
private final MonitorSchedulerConfig config;
private final ServiceEmitter emitter;
private final Set<Monitor> monitors;
private final Object lock = new Object();
private final CronScheduler scheduler;
private final ExecutorService executorService;
private volatile boolean started = false;
public MonitorScheduler(
MonitorScheduler(
MonitorSchedulerConfig config,
CronScheduler scheduler,
ServiceEmitter emitter,
List<Monitor> monitors,
ExecutorService executorService
List<Monitor> monitors
)
{
this.config = config;
this.scheduler = scheduler;
this.emitter = emitter;
this.monitors = Sets.newHashSet(monitors);
this.executorService = executorService;
}
@LifecycleStart
@ -131,59 +116,23 @@ public class MonitorScheduler
}
}
private void startMonitor(final Monitor monitor)
{
synchronized (lock) {
monitor.start();
long rate = config.getEmitterPeriod().getMillis();
Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
rate,
rate,
TimeUnit.MILLISECONDS,
new CronTask()
{
private volatile Future<Boolean> monitorFuture = null;
@Override
public void run(long scheduledRunTimeMillis)
{
try {
if (monitorFuture != null && monitorFuture.isDone()
&& !(monitorFuture.get() && hasMonitor(monitor))) {
removeMonitor(monitor);
monitor.getScheduledFuture().cancel(false);
log.debug("Stopped rescheduling %s (delay %s)", this, rate);
return;
}
log.trace("Running %s (period %s)", this, rate);
monitorFuture = executorService.submit(new Callable<Boolean>()
{
@Override
public Boolean call()
{
try {
return monitor.monitor(emitter);
}
catch (Throwable e) {
log.error(e, "Uncaught exception.");
return Boolean.FALSE;
}
}
});
}
catch (Throwable e) {
log.error(e, "Uncaught exception.");
}
}
});
monitor.setScheduledFuture(scheduledFuture);
}
}
private boolean hasMonitor(final Monitor monitor)
boolean hasMonitor(final Monitor monitor)
{
synchronized (lock) {
return monitors.contains(monitor);
}
}
MonitorSchedulerConfig getConfig()
{
return config;
}
ServiceEmitter getEmitter()
{
return emitter;
}
@GuardedBy("lock")
abstract void startMonitor(Monitor monitor);
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.concurrent.ScheduledExecutorService;
public class BasicMonitorSchedulerTest
{
private final MonitorSchedulerConfig config = new MonitorSchedulerConfig()
{
@Override
public Duration getEmitterPeriod()
{
return Duration.millis(5);
}
};
private ServiceEmitter emitter;
private ScheduledExecutorService exec;
@Before
public void setup()
{
emitter = Mockito.mock(ServiceEmitter.class);
exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest");
}
@Test
public void testStart_RepeatScheduling() throws InterruptedException
{
final Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true);
final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
config,
emitter,
ImmutableList.of(monitor),
exec
);
scheduler.start();
Thread.sleep(100);
Mockito.verify(monitor, Mockito.atLeast(2)).monitor(ArgumentMatchers.any());
scheduler.stop();
}
@Test
public void testStart_RepeatAndStopScheduling() throws InterruptedException
{
final Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.monitor(ArgumentMatchers.any())).thenReturn(true, true, true, false);
final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
config,
emitter,
ImmutableList.of(monitor),
exec
);
scheduler.start();
Thread.sleep(100);
// monitor.monitor() is called 5 times since a new task is scheduled first and then the current one is executed.
// See ScheduledExecutors.scheduleAtFixedRate() for details.
Mockito.verify(monitor, Mockito.times(5)).monitor(ArgumentMatchers.any());
scheduler.stop();
}
@Test
public void testStart_UnexpectedExceptionWhileMonitoring_ContinueMonitor() throws InterruptedException
{
final Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.monitor(ArgumentMatchers.any()))
.thenThrow(new RuntimeException("Test throwing exception while monitoring"));
final BasicMonitorScheduler scheduler = new BasicMonitorScheduler(
config,
emitter,
ImmutableList.of(monitor),
exec
);
scheduler.start();
Thread.sleep(100);
// monitor.monitor() is called 5 times since a new task is scheduled first and then the current one is executed.
// See ScheduledExecutors.scheduleAtFixedRate() for details.
Mockito.verify(monitor, Mockito.atLeast(2)).monitor(ArgumentMatchers.any());
scheduler.stop();
}
}

View File

@ -23,7 +23,9 @@ package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -38,21 +40,37 @@ import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class MonitorSchedulerTest
public class ClockDriftSafeMonitorSchedulerTest
{
// A real executor service to execute CronTask asynchronously.
// Many tests in this class use mocks to easily control the behavior of CronScheduler and the ExecutorService
// used by MonitorScheduler. However, as MonitorScheduler uses two differnt threads in production, one for
// scheduling a task to schedule a monitor (CronScheduler), and another for running a scheduled monitor
// asynchronously, these tests also require to run some tasks in an asynchronous manner. As mocks are convenient
// enough to control the behavior of things, we use another executorService only to run some tasks asynchronously
// to mimic the nature of asynchronous execution in MonitorScheduler.
private ExecutorService cronTaskRunner;
@Mock
private CronScheduler cronScheduler;
@Before
public void setUp()
{
cronTaskRunner = Execs.singleThreaded("monitor-scheduler-test");
MockitoAnnotations.initMocks(this);
}
@After
public void tearDown()
{
cronTaskRunner.shutdownNow();
}
@Test
public void testFindMonitor()
@ -72,11 +90,11 @@ public class MonitorSchedulerTest
ExecutorService executor = Mockito.mock(ExecutorService.class);
final MonitorScheduler scheduler = new MonitorScheduler(
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor1, monitor2),
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
executor
);
@ -91,17 +109,18 @@ public class MonitorSchedulerTest
}
@Test
public void testStart_RepeatScheduling()
public void testStart_RepeatScheduling() throws InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
CountDownLatch latch = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Future<?>>()
{
private int scheduleCount = 0;
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
@ -117,10 +136,14 @@ public class MonitorSchedulerTest
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
while (scheduleCount < 2) {
scheduleCount++;
task.run(123L);
}
cronTaskRunner.submit(() -> {
while (scheduleCount < 2) {
scheduleCount++;
task.run(123L);
}
latch.countDown();
return null;
});
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
@ -131,13 +154,15 @@ public class MonitorSchedulerTest
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
final MonitorScheduler scheduler = new MonitorScheduler(
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
cronScheduler,
executor
);
scheduler.start();
latch.await(5, TimeUnit.SECONDS);
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
@ -145,21 +170,22 @@ public class MonitorSchedulerTest
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
scheduler.stop();
}
@Test
public void testStart_RepeatAndStopScheduling()
public void testStart_RepeatAndStopScheduling() throws InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
CountDownLatch latch = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Future<?>>()
{
private int scheduleCount = 0;
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
@ -174,29 +200,34 @@ public class MonitorSchedulerTest
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
while (scheduleCount < 2) {
scheduleCount++;
task.run(123L);
}
cronTaskRunner.submit(() -> {
while (scheduleCount < 2) {
scheduleCount++;
task.run(123L);
}
latch.countDown();
return null;
});
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
final MonitorScheduler scheduler = new MonitorScheduler(
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
cronScheduler,
executor
);
scheduler.start();
latch.await(5, TimeUnit.SECONDS);
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
@ -204,26 +235,27 @@ public class MonitorSchedulerTest
Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
Mockito.verify(monitor, Mockito.times(1)).stop();
scheduler.stop();
}
@Test
public void testStart_UnexpectedExceptionWhileMonitoring()
public void testStart_UnexpectedExceptionWhileMonitoring() throws InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException());
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class)))
.thenThrow(new RuntimeException("Test throwing exception while monitoring"));
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean monitorResultHolder = new AtomicBoolean(false);
Mockito.doAnswer(new Answer<Future<?>>()
{
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
@ -234,78 +266,92 @@ public class MonitorSchedulerTest
public Future<Boolean> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[0];
((Callable<Boolean>) originalArgument).call();
return CompletableFuture.completedFuture(Boolean.TRUE);
final boolean continueMonitor = ((Callable<Boolean>) originalArgument).call();
monitorResultHolder.set(continueMonitor);
return CompletableFuture.completedFuture(continueMonitor);
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
task.run(123L);
cronTaskRunner.submit(() -> {
task.run(123L);
latch.countDown();
return null;
});
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
final MonitorScheduler scheduler = new MonitorScheduler(
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
cronScheduler,
executor
);
scheduler.start();
latch.await(5, TimeUnit.SECONDS);
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
Assert.assertTrue(monitorResultHolder.get());
scheduler.stop();
}
@Test
public void testStart_UnexpectedExceptionWhileScheduling()
public void testStart_UnexpectedExceptionWhileScheduling() throws InterruptedException
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
CountDownLatch latch = new CountDownLatch(1);
Mockito.doAnswer(new Answer<Future<?>>()
{
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
public Future<?> answer(InvocationOnMock invocation)
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException());
task.run(123L);
Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class)))
.thenThrow(new RuntimeException("Test throwing exception while scheduling"));
cronTaskRunner.submit(() -> {
task.run(123L);
latch.countDown();
return null;
});
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
final MonitorScheduler scheduler = new MonitorScheduler(
final MonitorScheduler scheduler = new ClockDriftSafeMonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
cronScheduler,
executor
);
scheduler.start();
latch.await(5, TimeUnit.SECONDS);
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
scheduler.stop();
}
private Future createDummyFuture()
{
Future<?> future = new Future()
@ -366,19 +412,5 @@ public class MonitorSchedulerTest
{
return true;
}
@Override
public Future<?> getScheduledFuture()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
// TODO Auto-generated method stub
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.metrics;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
import org.apache.druid.java.util.metrics.MonitorSchedulerConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -28,9 +29,17 @@ import org.joda.time.Period;
*/
public class DruidMonitorSchedulerConfig extends MonitorSchedulerConfig
{
@JsonProperty
private String schedulerClassName = BasicMonitorScheduler.class.getName();
@JsonProperty
private Period emissionPeriod = new Period("PT1M");
public String getSchedulerClassName()
{
return schedulerClassName;
}
@JsonProperty
public Period getEmissionPeriod()
{

View File

@ -32,9 +32,12 @@ import org.apache.druid.guice.DruidBinders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
import org.apache.druid.java.util.metrics.JvmCpuMonitor;
import org.apache.druid.java.util.metrics.JvmMonitor;
import org.apache.druid.java.util.metrics.JvmThreadsMonitor;
@ -56,6 +59,7 @@ import java.util.stream.Collectors;
*/
public class MetricsModule implements Module
{
static final String MONITORING_PROPERTY_PREFIX = "druid.monitoring";
private static final Logger log = new Logger(MetricsModule.class);
public static void register(Binder binder, Class<? extends Monitor> monitorClazz)
@ -66,8 +70,8 @@ public class MetricsModule implements Module
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.monitoring", DruidMonitorSchedulerConfig.class);
JsonConfigProvider.bind(binder, "druid.monitoring", MonitorsConfig.class);
JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX, DruidMonitorSchedulerConfig.class);
JsonConfigProvider.bind(binder, MONITORING_PROPERTY_PREFIX, MonitorsConfig.class);
DruidBinders.metricMonitorBinder(binder); // get the binder so that it will inject the empty set at a minimum.
@ -106,13 +110,24 @@ public class MetricsModule implements Module
);
}
return new MonitorScheduler(
config.get(),
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
emitter,
monitors,
Execs.multiThreaded(64, "MonitorThread-%d")
);
if (ClockDriftSafeMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName())) {
return new ClockDriftSafeMonitorScheduler(
config.get(),
emitter,
monitors,
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorScheduler").build(),
Execs.singleThreaded("MonitorRunner")
);
} else if (BasicMonitorScheduler.class.getName().equals(config.get().getSchedulerClassName())) {
return new BasicMonitorScheduler(
config.get(),
emitter,
monitors,
Execs.scheduledSingleThreaded("MonitorScheduler-%s")
);
} else {
throw new IAE("Unknown monitor scheduler[%s]", config.get().getSchedulerClassName());
}
}
@Provides

View File

@ -21,20 +21,41 @@ package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.CreationException;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.name.Names;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.BasicMonitorScheduler;
import org.apache.druid.java.util.metrics.ClockDriftSafeMonitorScheduler;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.server.DruidNode;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Properties;
public class MetricsModuleTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSimpleInjection()
{
@ -88,4 +109,64 @@ public class MetricsModuleTest
Assert.assertEquals(dataSource, dimensionIdHolder.getDataSource());
Assert.assertEquals(taskId, dimensionIdHolder.getTaskId());
}
@Test
public void testGetBasicMonitorSchedulerByDefault()
{
final MonitorScheduler monitorScheduler = createInjector(new Properties()).getInstance(MonitorScheduler.class);
Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass());
}
@Test
public void testGetClockDriftSafeMonitorSchedulerViaConfig()
{
final Properties properties = new Properties();
properties.setProperty(
StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX),
ClockDriftSafeMonitorScheduler.class.getName()
);
final MonitorScheduler monitorScheduler = createInjector(properties).getInstance(MonitorScheduler.class);
Assert.assertSame(ClockDriftSafeMonitorScheduler.class, monitorScheduler.getClass());
}
@Test
public void testGetBasicMonitorSchedulerViaConfig()
{
final Properties properties = new Properties();
properties.setProperty(
StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX),
BasicMonitorScheduler.class.getName()
);
final MonitorScheduler monitorScheduler = createInjector(properties).getInstance(MonitorScheduler.class);
Assert.assertSame(BasicMonitorScheduler.class, monitorScheduler.getClass());
}
@Test
public void testGetMonitorSchedulerUnknownSchedulerException()
{
final Properties properties = new Properties();
properties.setProperty(
StringUtils.format("%s.schedulerClassName", MetricsModule.MONITORING_PROPERTY_PREFIX),
"UnknownScheduler"
);
expectedException.expect(CreationException.class);
expectedException.expectCause(CoreMatchers.instanceOf(IllegalArgumentException.class));
expectedException.expectMessage("Unknown monitor scheduler[UnknownScheduler]");
createInjector(properties).getInstance(MonitorScheduler.class);
}
private static Injector createInjector(Properties properties)
{
return Guice.createInjector(
new JacksonModule(),
new LifecycleModule(),
binder -> {
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
binder.bind(ServiceEmitter.class).toInstance(new NoopServiceEmitter());
binder.bind(Properties.class).toInstance(properties);
},
new MetricsModule()
);
}
}