mirror of https://github.com/apache/druid.git
add ingest/notices/queueSize metric to give visibility into supervisor notices queue size (#11417)
This commit is contained in:
parent
b83742179a
commit
995d99d9e4
|
@ -181,6 +181,8 @@ These metrics are only available if the RealtimeMetricsMonitor is included in th
|
||||||
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource, taskId, taskType.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
|
|`ingest/handoff/count`|Number of handoffs that happened.|dataSource, taskId, taskType.|Varies. Generally greater than 0 once every segment granular period if cluster operating normally|
|
||||||
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3|
|
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId, taskType.|1~3|
|
||||||
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event |
|
|`ingest/events/messageGap`|Time gap between the data time in event and current system time.|dataSource, taskId, taskType.|Greater than 0, depends on the time carried in event |
|
||||||
|
|`ingest/notices/queueSize`|Number of pending notices to be processed by the coordinator|dataSource.|Typically 0 and occasionally in lower single digits. Should not be a very high number. |
|
||||||
|
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor|dataSource, noticeType.| < 1s. |
|
||||||
|
|
||||||
|
|
||||||
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
|
Note: If the JVM does not support CPU time measurement for the current thread, ingest/merge/cpu and ingest/persists/cpu will be 0.
|
||||||
|
|
|
@ -274,6 +274,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
*/
|
*/
|
||||||
private interface Notice
|
private interface Notice
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Returns a descriptive label for this notice type. Used for metrics emission and logging.
|
||||||
|
*
|
||||||
|
* @return task type label
|
||||||
|
*/
|
||||||
|
String getType();
|
||||||
|
|
||||||
void handle() throws ExecutionException, InterruptedException, TimeoutException;
|
void handle() throws ExecutionException, InterruptedException, TimeoutException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,6 +319,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
|
|
||||||
private class RunNotice implements Notice
|
private class RunNotice implements Notice
|
||||||
{
|
{
|
||||||
|
private static final String TYPE = "run_notice";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle()
|
public void handle()
|
||||||
{
|
{
|
||||||
|
@ -323,12 +332,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
|
|
||||||
runInternal();
|
runInternal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// change taskCount without resubmitting.
|
// change taskCount without resubmitting.
|
||||||
private class DynamicAllocationTasksNotice implements Notice
|
private class DynamicAllocationTasksNotice implements Notice
|
||||||
{
|
{
|
||||||
Callable<Integer> scaleAction;
|
Callable<Integer> scaleAction;
|
||||||
|
private static final String TYPE = "dynamic_allocation_tasks_notice";
|
||||||
|
|
||||||
DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
|
DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
|
||||||
{
|
{
|
||||||
|
@ -382,6 +398,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -458,6 +480,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
|
|
||||||
private class ShutdownNotice implements Notice
|
private class ShutdownNotice implements Notice
|
||||||
{
|
{
|
||||||
|
private static final String TYPE = "shutdown_notice";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle() throws InterruptedException, ExecutionException, TimeoutException
|
public void handle() throws InterruptedException, ExecutionException, TimeoutException
|
||||||
{
|
{
|
||||||
|
@ -468,11 +492,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
stopLock.notifyAll();
|
stopLock.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ResetNotice implements Notice
|
private class ResetNotice implements Notice
|
||||||
{
|
{
|
||||||
final DataSourceMetadata dataSourceMetadata;
|
final DataSourceMetadata dataSourceMetadata;
|
||||||
|
private static final String TYPE = "reset_notice";
|
||||||
|
|
||||||
ResetNotice(DataSourceMetadata dataSourceMetadata)
|
ResetNotice(DataSourceMetadata dataSourceMetadata)
|
||||||
{
|
{
|
||||||
|
@ -484,12 +515,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
{
|
{
|
||||||
resetInternal(dataSourceMetadata);
|
resetInternal(dataSourceMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected class CheckpointNotice implements Notice
|
protected class CheckpointNotice implements Notice
|
||||||
{
|
{
|
||||||
private final int taskGroupId;
|
private final int taskGroupId;
|
||||||
private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata;
|
private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata;
|
||||||
|
private static final String TYPE = "checkpoint_notice";
|
||||||
|
|
||||||
CheckpointNotice(
|
CheckpointNotice(
|
||||||
int taskGroupId,
|
int taskGroupId,
|
||||||
|
@ -560,6 +598,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class
|
// Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class
|
||||||
|
@ -908,7 +952,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
notice.handle();
|
notice.handle();
|
||||||
Instant handleNoticeEndTime = Instant.now();
|
Instant handleNoticeEndTime = Instant.now();
|
||||||
Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
|
Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
|
||||||
log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d]", notice.getClass().getName(), timeElapsed.toMillis(), getNoticesQueueSize());
|
String noticeType = notice.getType();
|
||||||
|
log.debug("Handled notice [%s] from notices queue in [%d] ms, current notices queue size [%d] for datasource [%s]", noticeType, timeElapsed.toMillis(), getNoticesQueueSize(), dataSource);
|
||||||
|
emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
|
||||||
}
|
}
|
||||||
catch (Throwable e) {
|
catch (Throwable e) {
|
||||||
stateManager.recordThrowableEvent(e);
|
stateManager.recordThrowableEvent(e);
|
||||||
|
@ -3588,6 +3634,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* default implementation, schedules periodic fetch of latest offsets and {@link #emitLag} reporting for Kafka and Kinesis
|
* default implementation, schedules periodic fetch of latest offsets and {@link #emitLag} reporting for Kafka and Kinesis
|
||||||
|
* and periodic reporting of {@Link #emitNoticesQueueSize} for various data sources.
|
||||||
*/
|
*/
|
||||||
protected void scheduleReporting(ScheduledExecutorService reportingExec)
|
protected void scheduleReporting(ScheduledExecutorService reportingExec)
|
||||||
{
|
{
|
||||||
|
@ -3610,6 +3657,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
|
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
|
||||||
TimeUnit.MILLISECONDS
|
TimeUnit.MILLISECONDS
|
||||||
);
|
);
|
||||||
|
reportingExec.scheduleAtFixedRate(
|
||||||
|
this::emitNoticesQueueSize,
|
||||||
|
ioConfig.getStartDelay().getMillis() + INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
|
||||||
|
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3658,6 +3711,39 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
||||||
&& makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0;
|
&& makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata)) <= 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void emitNoticeProcessTime(String noticeType, long timeInMillis)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
emitter.emit(
|
||||||
|
ServiceMetricEvent.builder()
|
||||||
|
.setDimension("noticeType", noticeType)
|
||||||
|
.setDimension("dataSource", dataSource)
|
||||||
|
.build("ingest/notices/time", timeInMillis)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.warn(e, "Unable to emit notices process time");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void emitNoticesQueueSize()
|
||||||
|
{
|
||||||
|
if (spec.isSuspended()) {
|
||||||
|
// don't emit metrics if supervisor is suspended
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
emitter.emit(
|
||||||
|
ServiceMetricEvent.builder()
|
||||||
|
.setDimension("dataSource", dataSource)
|
||||||
|
.build("ingest/notices/queueSize", getNoticesQueueSize())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
log.warn(e, "Unable to emit notices queue size");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void emitLag()
|
protected void emitLag()
|
||||||
{
|
{
|
||||||
if (spec.isSuspended() || !stateManager.isSteadyState()) {
|
if (spec.isSuspended() || !stateManager.isSteadyState()) {
|
||||||
|
|
|
@ -88,6 +88,8 @@ import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -97,6 +99,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
{
|
{
|
||||||
|
@ -625,7 +628,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
{
|
{
|
||||||
expectEmitterSupervisor(false);
|
expectEmitterSupervisor(false);
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
||||||
latch,
|
latch,
|
||||||
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
|
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
|
||||||
|
@ -643,19 +646,23 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
|
|
||||||
|
|
||||||
latch.await();
|
latch.await();
|
||||||
Assert.assertEquals(6, emitter.getEvents().size());
|
List<Event> events = emitter.getEvents();
|
||||||
Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric"));
|
List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag",
|
||||||
Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
|
"ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
|
||||||
Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric"));
|
events = filterMetrics(events, whitelist);
|
||||||
Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
|
Assert.assertEquals(6, events.size());
|
||||||
Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric"));
|
Assert.assertEquals("ingest/test/lag", events.get(0).toMap().get("metric"));
|
||||||
Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
|
Assert.assertEquals(850L, events.get(0).toMap().get("value"));
|
||||||
Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(3).toMap().get("metric"));
|
Assert.assertEquals("ingest/test/maxLag", events.get(1).toMap().get("metric"));
|
||||||
Assert.assertEquals(45000L, emitter.getEvents().get(3).toMap().get("value"));
|
Assert.assertEquals(500L, events.get(1).toMap().get("value"));
|
||||||
Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(4).toMap().get("metric"));
|
Assert.assertEquals("ingest/test/avgLag", events.get(2).toMap().get("metric"));
|
||||||
Assert.assertEquals(20000L, emitter.getEvents().get(4).toMap().get("value"));
|
Assert.assertEquals(283L, events.get(2).toMap().get("value"));
|
||||||
Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(5).toMap().get("metric"));
|
Assert.assertEquals("ingest/test/lag/time", events.get(3).toMap().get("metric"));
|
||||||
Assert.assertEquals(15000L, emitter.getEvents().get(5).toMap().get("value"));
|
Assert.assertEquals(45000L, events.get(3).toMap().get("value"));
|
||||||
|
Assert.assertEquals("ingest/test/maxLag/time", events.get(4).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(20000L, events.get(4).toMap().get("value"));
|
||||||
|
Assert.assertEquals("ingest/test/avgLag/time", events.get(5).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(15000L, events.get(5).toMap().get("value"));
|
||||||
verifyAll();
|
verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -664,7 +671,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
{
|
{
|
||||||
expectEmitterSupervisor(false);
|
expectEmitterSupervisor(false);
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
||||||
latch,
|
latch,
|
||||||
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
|
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
|
||||||
|
@ -682,13 +689,16 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
|
|
||||||
|
|
||||||
latch.await();
|
latch.await();
|
||||||
Assert.assertEquals(3, emitter.getEvents().size());
|
List<Event> events = emitter.getEvents();
|
||||||
Assert.assertEquals("ingest/test/lag", emitter.getEvents().get(0).toMap().get("metric"));
|
List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag", "ingest/test/avgLag");
|
||||||
Assert.assertEquals(850L, emitter.getEvents().get(0).toMap().get("value"));
|
events = filterMetrics(events, whitelist);
|
||||||
Assert.assertEquals("ingest/test/maxLag", emitter.getEvents().get(1).toMap().get("metric"));
|
Assert.assertEquals(3, events.size());
|
||||||
Assert.assertEquals(500L, emitter.getEvents().get(1).toMap().get("value"));
|
Assert.assertEquals("ingest/test/lag", events.get(0).toMap().get("metric"));
|
||||||
Assert.assertEquals("ingest/test/avgLag", emitter.getEvents().get(2).toMap().get("metric"));
|
Assert.assertEquals(850L, events.get(0).toMap().get("value"));
|
||||||
Assert.assertEquals(283L, emitter.getEvents().get(2).toMap().get("value"));
|
Assert.assertEquals("ingest/test/maxLag", events.get(1).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(500L, events.get(1).toMap().get("value"));
|
||||||
|
Assert.assertEquals("ingest/test/avgLag", events.get(2).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(283L, events.get(2).toMap().get("value"));
|
||||||
verifyAll();
|
verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -697,7 +707,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
{
|
{
|
||||||
expectEmitterSupervisor(false);
|
expectEmitterSupervisor(false);
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
||||||
latch,
|
latch,
|
||||||
null,
|
null,
|
||||||
|
@ -715,13 +725,78 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
|
|
||||||
|
|
||||||
latch.await();
|
latch.await();
|
||||||
Assert.assertEquals(3, emitter.getEvents().size());
|
List<Event> events = emitter.getEvents();
|
||||||
Assert.assertEquals("ingest/test/lag/time", emitter.getEvents().get(0).toMap().get("metric"));
|
List<String> whitelist = Arrays.asList("ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
|
||||||
Assert.assertEquals(45000L, emitter.getEvents().get(0).toMap().get("value"));
|
events = filterMetrics(events, whitelist);
|
||||||
Assert.assertEquals("ingest/test/maxLag/time", emitter.getEvents().get(1).toMap().get("metric"));
|
Assert.assertEquals(3, events.size());
|
||||||
Assert.assertEquals(20000L, emitter.getEvents().get(1).toMap().get("value"));
|
Assert.assertEquals("ingest/test/lag/time", events.get(0).toMap().get("metric"));
|
||||||
Assert.assertEquals("ingest/test/avgLag/time", emitter.getEvents().get(2).toMap().get("metric"));
|
Assert.assertEquals(45000L, events.get(0).toMap().get("value"));
|
||||||
Assert.assertEquals(15000L, emitter.getEvents().get(2).toMap().get("value"));
|
Assert.assertEquals("ingest/test/maxLag/time", events.get(1).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(20000L, events.get(1).toMap().get("value"));
|
||||||
|
Assert.assertEquals("ingest/test/avgLag/time", events.get(2).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(15000L, events.get(2).toMap().get("value"));
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmitNoticesQueueSize() throws Exception
|
||||||
|
{
|
||||||
|
expectEmitterSupervisor(false);
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
||||||
|
latch,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
supervisor.start();
|
||||||
|
|
||||||
|
Assert.assertTrue(supervisor.stateManager.isHealthy());
|
||||||
|
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
|
||||||
|
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
|
||||||
|
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
|
||||||
|
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
|
||||||
|
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
List<Event> events = emitter.getEvents();
|
||||||
|
List<String> whitelist = Collections.singletonList("ingest/notices/queueSize");
|
||||||
|
events = filterMetrics(events, whitelist);
|
||||||
|
Assert.assertEquals(1, events.size());
|
||||||
|
Assert.assertEquals("ingest/notices/queueSize", events.get(0).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(0, events.get(0).toMap().get("value"));
|
||||||
|
Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
|
||||||
|
verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmitNoticesTime() throws Exception
|
||||||
|
{
|
||||||
|
expectEmitterSupervisor(false);
|
||||||
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
|
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
||||||
|
latch,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
supervisor.start();
|
||||||
|
supervisor.emitNoticesTime();
|
||||||
|
Assert.assertTrue(supervisor.stateManager.isHealthy());
|
||||||
|
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
|
||||||
|
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
|
||||||
|
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
|
||||||
|
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
|
||||||
|
latch.await();
|
||||||
|
List<Event> events = emitter.getEvents();
|
||||||
|
List<String> whitelist = Collections.singletonList("ingest/notices/time");
|
||||||
|
events = filterMetrics(events, whitelist);
|
||||||
|
Assert.assertEquals(1, events.size());
|
||||||
|
Assert.assertEquals("ingest/notices/time", events.get(0).toMap().get("metric"));
|
||||||
|
Assert.assertEquals(500L, events.get(0).toMap().get("value"));
|
||||||
|
Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
|
||||||
|
Assert.assertEquals("dummyNoticeType", events.get(0).toMap().get("noticeType"));
|
||||||
verifyAll();
|
verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -730,7 +805,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
{
|
{
|
||||||
expectEmitterSupervisor(true);
|
expectEmitterSupervisor(true);
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
|
||||||
latch,
|
latch,
|
||||||
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
|
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
|
||||||
|
@ -748,10 +823,22 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
|
|
||||||
|
|
||||||
latch.await();
|
latch.await();
|
||||||
Assert.assertEquals(0, emitter.getEvents().size());
|
List<Event> events = emitter.getEvents();
|
||||||
|
List<String> whitelist = Arrays.asList("ingest/test/lag", "ingest/test/maxLag",
|
||||||
|
"ingest/test/avgLag", "ingest/test/lag/time", "ingest/test/maxLag/time", "ingest/test/avgLag/time");
|
||||||
|
events = filterMetrics(events, whitelist);
|
||||||
|
Assert.assertEquals(0, events.size());
|
||||||
verifyAll();
|
verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
|
||||||
|
{
|
||||||
|
List<Event> result = events.stream()
|
||||||
|
.filter(e -> whitelist.contains(e.toMap().get("metric").toString()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
private void expectEmitterSupervisor(boolean suspended) throws EntryExistsException
|
private void expectEmitterSupervisor(boolean suspended) throws EntryExistsException
|
||||||
{
|
{
|
||||||
spec = createMock(SeekableStreamSupervisorSpec.class);
|
spec = createMock(SeekableStreamSupervisorSpec.class);
|
||||||
|
@ -1249,6 +1336,19 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void emitNoticesQueueSize()
|
||||||
|
{
|
||||||
|
super.emitNoticesQueueSize();
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void emitNoticesTime()
|
||||||
|
{
|
||||||
|
super.emitNoticeProcessTime("dummyNoticeType", 500);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LagStats computeLagStats()
|
public LagStats computeLagStats()
|
||||||
{
|
{
|
||||||
|
@ -1265,6 +1365,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
||||||
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
|
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
|
||||||
TimeUnit.MILLISECONDS
|
TimeUnit.MILLISECONDS
|
||||||
);
|
);
|
||||||
|
reportingExec.scheduleAtFixedRate(
|
||||||
|
this::emitNoticesQueueSize,
|
||||||
|
ioConfig.getStartDelay().getMillis(),
|
||||||
|
spec.getMonitorSchedulerConfig().getEmitterPeriod().getMillis(),
|
||||||
|
TimeUnit.MILLISECONDS
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1276,6 +1276,7 @@ nativeQueryIds
|
||||||
netAddress
|
netAddress
|
||||||
netHwaddr
|
netHwaddr
|
||||||
netName
|
netName
|
||||||
|
noticeType
|
||||||
numComplexMetrics
|
numComplexMetrics
|
||||||
numDimensions
|
numDimensions
|
||||||
numMetrics
|
numMetrics
|
||||||
|
|
Loading…
Reference in New Issue