This commit is contained in:
Adithya Chakilam 2024-10-07 16:45:13 -05:00
parent e5d33af19b
commit 38ea363318
2 changed files with 45 additions and 2 deletions

View File

@ -4470,7 +4470,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
protected void emitTaskCount()
public void emitTaskCount()
{
try {
ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent.builder()
@ -4492,7 +4492,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
));
}
catch (Exception e) {
log.warn(e, "Unable to active/publisihing task count");
log.warn(e, "Unable to publish active/publisihing task count");
}
}

View File

@ -1800,6 +1800,32 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
verifyAll();
}
@Test
public void testEmitTaskCounts() throws Exception
{
expectEmitterSupervisor(false);
CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
TestEmittingTestSeekableStreamSupervisor.TASK_COUNTS,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
latch.await();
emitter.verifyEmitted("task/supervisor/active/count", 1);
emitter.verifyEmitted("task/supervisor/active/count", 1);
verifyAll();
}
@Test
public void testGetStats()
{
@ -3001,6 +3027,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
private static final byte LAG = 0x01;
private static final byte NOTICE_QUEUE = 0x02;
private static final byte NOTICE_PROCESS = 0x04;
private static final byte TASK_COUNTS = 0x08;
TestEmittingTestSeekableStreamSupervisor(
@ -3062,6 +3089,16 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
latch.countDown();
}
@Override
public void emitTaskCount()
{
if ((metricFlag & TASK_COUNTS) == 0) {
return;
}
super.emitTaskCount();
latch.countDown();
}
@Override
public LagStats computeLagStats()
{
@ -3084,6 +3121,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitTaskCount,
ioConfig.getStartDelay().getMillis(),
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}
}