Fixes Kafka Supervisor Lag Report (#13380)

Fixes inclusion of all stream partitions in all tasks.

The PR (Adds Idle feature to `SeekableStreamSupervisor` for inactive stream) - https://github.com/apache/druid/pull/13144 updates the resulting lag calculation map in `KafkaSupervisor` to include all the latest partitions from the stream to set the idle state accordingly rather than the previous way of lag calculation only for the partitions actively being read from the stream. This led to an explosion of metrics in lag reports in cases where 1000s of tasks per supervisor are present.

Changes:
- Add a new method to generate lags for only those partitions a single task is actively reading from while updating the Supervisor reports.
This commit is contained in:
Tejaswini Bandlamudi 2022-11-17 22:24:45 +05:30 committed by GitHub
parent 6b9344cd39
commit bf10ff73a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 173 additions and 4 deletions

View File

@ -160,7 +160,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
)
{
KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
Map<Integer, Long> partitionLag = getRecordLagPerPartition(getHighestCurrentOffsets());
Map<Integer, Long> partitionLag = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
return new KafkaSupervisorReportPayload(
spec.getDataSchema().getDataSource(),
ioConfig.getTopic(),
@ -260,7 +260,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
);
}
return getRecordLagPerPartition(highestCurrentOffsets);
return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
}
@Nullable
@ -271,10 +271,10 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
return null;
}
@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
// Used while calculating cummulative lag for entire stream
private Map<Integer, Long> getRecordLagPerPartitionInLatestSequences(Map<Integer, Long> currentOffsets)
{
if (latestSequenceFromStream == null) {
return Collections.emptyMap();
@ -293,6 +293,30 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
);
}
@Override
// suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
@SuppressWarnings("SSBasedInspection")
// Used while generating Supervisor lag reports per task
protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
{
if (latestSequenceFromStream == null || currentOffsets == null) {
return Collections.emptyMap();
}
return currentOffsets
.entrySet()
.stream()
.filter(e -> latestSequenceFromStream.get(e.getKey()) != null)
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue() != null
? latestSequenceFromStream.get(e.getKey()) - e.getValue()
: 0
)
);
}
@Override
protected Map<Integer, Long> getTimeLagPerPartition(Map<Integer, Long> currentOffsets)
{

View File

@ -1858,6 +1858,151 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
}
@Test
public void testReportWhenMultipleActiveTasks() throws Exception
{
final TaskLocation location1 = new TaskLocation("testHost", 1234, -1);
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
supervisor = getTestableSupervisorForIdleBehaviour(1, 2, true, "PT10S", null, null, false, null);
addSomeEvents(6);
Task id1 = createKafkaIndexTask(
"id1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(0, 2L, 2, 1L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);
Task id2 = createKafkaIndexTask(
"id2",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of(1, 3L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
"topic",
ImmutableMap.of(1, Long.MAX_VALUE)
),
null,
null,
supervisor.getTuningConfig()
);
List<Task> existingTasks = ImmutableList.of(id1, id2);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(existingTasks).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id1"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id2"))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
.anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
replayAll();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
0,
ImmutableMap.of(0, 0L, 2, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
ImmutableMap.of(1, 0L),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("id2"),
ImmutableSet.of()
);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
verifyAll();
EasyMock.reset(taskClient);
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(0, 2L, 2, 1L)));
EasyMock.expect(taskClient.getCurrentOffsetsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(ImmutableMap.of(1, 3L)));
EasyMock.replay(taskClient);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
SupervisorReport<KafkaSupervisorReportPayload> report = supervisor.getStatus();
verifyAll();
Assert.assertEquals(DATASOURCE, report.getId());
KafkaSupervisorReportPayload payload = report.getPayload();
Assert.assertEquals(DATASOURCE, payload.getDataSource());
Assert.assertEquals(10L, payload.getDurationSeconds());
Assert.assertEquals(NUM_PARTITIONS, payload.getPartitions());
Assert.assertEquals(1, payload.getReplicas());
Assert.assertEquals(topic, payload.getStream());
Assert.assertEquals(2, payload.getActiveTasks().size());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState());
Assert.assertEquals(0, payload.getRecentErrors().size());
TaskReportData id1TaskReport = payload.getActiveTasks().get(0);
TaskReportData id2TaskReport = payload.getActiveTasks().get(1);
Assert.assertEquals("id2", id2TaskReport.getId());
Assert.assertEquals(ImmutableMap.of(1, 0L), id2TaskReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(1, 3L), id2TaskReport.getCurrentOffsets());
Assert.assertEquals(ImmutableMap.of(1, 4L), id2TaskReport.getLag());
Assert.assertEquals("id1", id1TaskReport.getId());
Assert.assertEquals(ImmutableMap.of(0, 0L, 2, 0L), id1TaskReport.getStartingOffsets());
Assert.assertEquals(ImmutableMap.of(0, 2L, 2, 1L), id1TaskReport.getCurrentOffsets());
Assert.assertEquals(ImmutableMap.of(0, 5L, 2, 6L), id1TaskReport.getLag());
Assert.assertEquals(ImmutableMap.of(0, 7L, 1, 7L, 2, 7L), payload.getLatestOffsets());
Assert.assertEquals(ImmutableMap.of(0, 5L, 1, 4L, 2, 6L), payload.getMinimumLag());
Assert.assertEquals(15L, (long) payload.getAggregateLag());
Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow());
}
@Test
public void testSupervisorIsIdleIfStreamInactive() throws Exception
{