Added CronScheduler support as a proof to clock drift while emitting metrics (#10448)

Co-authored-by: Ayush Kulshrestha <ayush.kulshrestha@miqdigital.com>
This commit is contained in:
Ayush Kulshrestha 2020-11-25 17:01:38 +05:30 committed by GitHub
parent fe693a4f01
commit d0c2ede50c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 423 additions and 25 deletions

View File

@ -224,6 +224,10 @@
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
<dependency>
<groupId>io.timeandspace</groupId>
<artifactId>cron-scheduler</artifactId>
</dependency>

View File

@ -22,11 +22,16 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.concurrent.Future;
/**
*/
public abstract class AbstractMonitor implements Monitor
{
private volatile boolean started = false;
private volatile Future<?> scheduledFuture;
@Override
public void start()
@ -51,4 +56,16 @@ public abstract class AbstractMonitor implements Monitor
}
public abstract boolean doMonitor(ServiceEmitter emitter);
@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}
@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}
}

View File

@ -24,10 +24,13 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
public abstract class CompoundMonitor implements Monitor
{
private final List<Monitor> monitors;
private volatile Future<?> scheduledFuture;
public CompoundMonitor(List<Monitor> monitors)
{
@ -61,5 +64,17 @@ public abstract class CompoundMonitor implements Monitor
return shouldReschedule(Lists.transform(monitors, monitor -> monitor.monitor(emitter)));
}
@Override
public Future<?> getScheduledFuture()
{
return scheduledFuture;
}
@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
this.scheduledFuture = scheduledFuture;
}
public abstract boolean shouldReschedule(List<Boolean> reschedules);
}

View File

@ -21,6 +21,9 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.concurrent.Future;
/**
*/
public interface Monitor
@ -35,4 +38,8 @@ public interface Monitor
* @return true if this monitor needs to continue monitoring. False otherwise.
*/
boolean monitor(ServiceEmitter emitter);
Future<?> getScheduledFuture();
void setScheduledFuture(Future<?> scheduledFuture);
}

View File

@ -20,41 +20,52 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.Sets;
import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
*/
public class MonitorScheduler
{
private static final Logger log = new Logger(MonitorScheduler.class);
private final MonitorSchedulerConfig config;
private final ScheduledExecutorService exec;
private final ServiceEmitter emitter;
private final Set<Monitor> monitors;
private final Object lock = new Object();
private final CronScheduler scheduler;
private final ExecutorService executorService;
private volatile boolean started = false;
public MonitorScheduler(
MonitorSchedulerConfig config,
ScheduledExecutorService exec,
CronScheduler scheduler,
ServiceEmitter emitter,
List<Monitor> monitors
List<Monitor> monitors,
ExecutorService executorService
)
{
this.config = config;
this.exec = exec;
this.scheduler = scheduler;
this.emitter = emitter;
this.monitors = Sets.newHashSet(monitors);
this.executorService = executorService;
}
@LifecycleStart
@ -124,24 +135,47 @@ public class MonitorScheduler
{
synchronized (lock) {
monitor.start();
ScheduledExecutors.scheduleAtFixedRate(
exec,
config.getEmitterPeriod(),
new Callable<ScheduledExecutors.Signal>()
long rate = config.getEmitterPeriod().getMillis();
Future<?> scheduledFuture = scheduler.scheduleAtFixedRate(
rate,
rate,
TimeUnit.MILLISECONDS,
new CronTask()
{
private volatile Future<Boolean> monitorFuture = null;
@Override
public ScheduledExecutors.Signal call()
public void run(long scheduledRunTimeMillis)
{
// Run one more time even if the monitor was removed, in case there's some extra data to flush
if (monitor.monitor(emitter) && hasMonitor(monitor)) {
return ScheduledExecutors.Signal.REPEAT;
} else {
removeMonitor(monitor);
return ScheduledExecutors.Signal.STOP;
try {
if (monitorFuture != null && monitorFuture.isDone()
&& !(monitorFuture.get() && hasMonitor(monitor))) {
removeMonitor(monitor);
monitor.getScheduledFuture().cancel(false);
log.debug("Stopped rescheduling %s (delay %s)", this, rate);
return;
}
log.trace("Running %s (period %s)", this, rate);
monitorFuture = executorService.submit(new Callable<Boolean>()
{
@Override
public Boolean call()
{
try {
return monitor.monitor(emitter);
}
catch (Throwable e) {
log.error(e, "Uncaught exception.");
return Boolean.FALSE;
}
}
});
}
catch (Throwable e) {
log.error(e, "Uncaught exception.");
}
}
}
);
});
monitor.setScheduledFuture(scheduledFuture);
}
}
@ -151,4 +185,5 @@ public class MonitorScheduler
return monitors.contains(monitor);
}
}
}

View File

@ -19,17 +19,41 @@
package org.apache.druid.java.util.metrics;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.concurrent.Execs;
import io.timeandspace.cronscheduler.CronScheduler;
import io.timeandspace.cronscheduler.CronTask;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MonitorSchedulerTest
{
@Mock
private CronScheduler cronScheduler;
@Before
public void setUp()
{
MockitoAnnotations.initMocks(this);
}
@Test
public void testFindMonitor()
{
@ -45,12 +69,15 @@ public class MonitorSchedulerTest
final Monitor1 monitor1 = new Monitor1();
final Monitor2 monitor2 = new Monitor2();
ExecutorService executor = Mockito.mock(ExecutorService.class);
final MonitorScheduler scheduler = new MonitorScheduler(
Mockito.mock(MonitorSchedulerConfig.class),
Execs.scheduledSingleThreaded("monitor-scheduler-test"),
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("monitor-scheduler-test").build(),
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor1, monitor2)
ImmutableList.of(monitor1, monitor2),
executor
);
final Optional<Monitor1> maybeFound1 = scheduler.findMonitor(Monitor1.class);
@ -62,7 +89,264 @@ public class MonitorSchedulerTest
Assert.assertFalse(scheduler.findMonitor(Monitor3.class).isPresent());
}
@Test
public void testStart_RepeatScheduling()
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Mockito.doAnswer(new Answer<Future<?>>()
{
private int scheduleCount = 0;
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
Mockito.doAnswer(new Answer<Future<?>>()
{
@Override
public Future<Boolean> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[0];
((Callable<Boolean>) originalArgument).call();
return CompletableFuture.completedFuture(Boolean.TRUE);
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
while (scheduleCount < 2) {
scheduleCount++;
task.run(123L);
}
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Monitor monitor = Mockito.mock(Monitor.class);
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
final MonitorScheduler scheduler = new MonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
scheduler.start();
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(2)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(2)).monitor(ArgumentMatchers.any());
}
@Test
public void testStart_RepeatAndStopScheduling()
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Mockito.doAnswer(new Answer<Future<?>>()
{
private int scheduleCount = 0;
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
Mockito.doAnswer(new Answer<Future<?>>()
{
@Override
public Future<Boolean> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[0];
((Callable<Boolean>) originalArgument).call();
return CompletableFuture.completedFuture(Boolean.FALSE);
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
while (scheduleCount < 2) {
scheduleCount++;
task.run(123L);
}
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
final MonitorScheduler scheduler = new MonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
scheduler.start();
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
Mockito.verify(monitor, Mockito.times(1)).stop();
}
@Test
public void testStart_UnexpectedExceptionWhileMonitoring()
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
Mockito.when(monitor.monitor(ArgumentMatchers.any(ServiceEmitter.class))).thenThrow(new RuntimeException());
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
Mockito.doAnswer(new Answer<Future<?>>()
{
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
Mockito.doAnswer(new Answer<Future<?>>()
{
@Override
public Future<Boolean> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[0];
((Callable<Boolean>) originalArgument).call();
return CompletableFuture.completedFuture(Boolean.TRUE);
}
}).when(executor).submit(ArgumentMatchers.any(Callable.class));
task.run(123L);
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
final MonitorScheduler scheduler = new MonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
scheduler.start();
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
Mockito.verify(monitor, Mockito.times(1)).monitor(ArgumentMatchers.any());
}
@Test
public void testStart_UnexpectedExceptionWhileScheduling()
{
ExecutorService executor = Mockito.mock(ExecutorService.class);
Monitor monitor = Mockito.mock(Monitor.class);
Mockito.when(monitor.getScheduledFuture()).thenReturn(createDummyFuture());
MonitorSchedulerConfig config = Mockito.mock(MonitorSchedulerConfig.class);
Mockito.when(config.getEmitterPeriod()).thenReturn(new org.joda.time.Duration(1000L));
Mockito.doAnswer(new Answer<Future<?>>()
{
@SuppressWarnings("unchecked")
@Override
public Future<?> answer(InvocationOnMock invocation) throws Exception
{
final Object originalArgument = (invocation.getArguments())[3];
CronTask task = ((CronTask) originalArgument);
Mockito.when(executor.submit(ArgumentMatchers.any(Callable.class))).thenThrow(new RuntimeException());
task.run(123L);
return createDummyFuture();
}
}).when(cronScheduler).scheduleAtFixedRate(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
final MonitorScheduler scheduler = new MonitorScheduler(
config,
cronScheduler,
Mockito.mock(ServiceEmitter.class),
ImmutableList.of(monitor),
executor);
scheduler.start();
Mockito.verify(monitor, Mockito.times(1)).start();
Mockito.verify(cronScheduler, Mockito.times(1)).scheduleAtFixedRate(ArgumentMatchers.anyLong(),
ArgumentMatchers.anyLong(),
ArgumentMatchers.any(), ArgumentMatchers.any(CronTask.class));
Mockito.verify(executor, Mockito.times(1)).submit(ArgumentMatchers.any(Callable.class));
}
private Future createDummyFuture()
{
Future<?> future = new Future()
{
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
return false;
}
@Override
public boolean isCancelled()
{
return false;
}
@Override
public boolean isDone()
{
return false;
}
@Override
public Object get()
{
return null;
}
@Override
public Object get(long timeout, TimeUnit unit)
{
return null;
}
};
return future;
}
private static class NoopMonitor implements Monitor
{
@Override
@ -82,5 +366,19 @@ public class MonitorSchedulerTest
{
return true;
}
@Override
public Future<?> getScheduledFuture()
{
// TODO Auto-generated method stub
return null;
}
@Override
public void setScheduledFuture(Future<?> scheduledFuture)
{
// TODO Auto-generated method stub
}
}
}

View File

@ -345,6 +345,16 @@ libraries:
---
name: CronScheduler
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 0.1
libraries:
- io.timeandspace: cron-scheduler
---
name: LMAX Disruptor
license_category: binary
module: java-core

View File

@ -1255,6 +1255,11 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.timeandspace</groupId>
<artifactId>cron-scheduler</artifactId>
<version>0.1</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -311,6 +311,10 @@
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
<dependency>
<groupId>io.timeandspace</groupId>
<artifactId>cron-scheduler</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -27,6 +27,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import io.timeandspace.cronscheduler.CronScheduler;
import org.apache.druid.guice.DruidBinders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@ -42,6 +43,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.java.util.metrics.SysMonitor;
import org.apache.druid.query.ExecutorServiceMonitor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -106,9 +108,10 @@ public class MetricsModule implements Module
return new MonitorScheduler(
config.get(),
Execs.scheduledSingleThreaded("MonitorScheduler-%s"),
CronScheduler.newBuilder(Duration.ofSeconds(1L)).setThreadName("MonitorSchedulerThread").build(),
emitter,
monitors
monitors,
Execs.multiThreaded(64, "MonitorThread-%d")
);
}