Enhance rolling Supervisor restarts at taskDuration (#15859)

This commit is contained in:
YongGang 2024-02-14 15:44:34 -08:00 committed by GitHub
parent c324e37751
commit 19ed5c863f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 288 additions and 33 deletions

View File

@ -79,6 +79,8 @@ public class KafkaSupervisorIOConfigTest
Assert.assertNull(config.getTopicPattern());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertNull(config.getStopTaskCount());
Assert.assertEquals((int) config.getTaskCount(), config.getMaxAllowedStops());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(100, config.getPollTimeout());

View File

@ -62,6 +62,8 @@ public class KinesisSupervisorIOConfigTest
Assert.assertEquals(KinesisRegion.US_EAST_1.getEndpoint(), config.getEndpoint());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
Assert.assertNull(config.getStopTaskCount());
Assert.assertEquals((int) config.getTaskCount(), config.getMaxAllowedStops());
Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());

View File

@ -3105,45 +3105,59 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>();
boolean stopTasksEarly = false;
boolean stopTasksEarly;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) {
log.info("Early stop requested - signalling tasks to complete");
earlyStopTime = null;
stopTasksEarly = true;
} else {
stopTasksEarly = false;
}
int stoppedTasks = 0;
for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();
AtomicInteger stoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
activelyReadingTaskGroups
.entrySet().stream().sorted(
Comparator.comparingLong(
(Entry<Integer, TaskGroup> entry) ->
computeEarliestTaskStartTime(entry.getValue())
.getMillis()))
.forEach(entry -> {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();
if (stopTasksEarly) {
log.info("Stopping task group [%d] early. It has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
} else {
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
if (pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() + stoppedTasks
< ioConfig.getStopTaskCount()) {
log.info("Task group [%d] has run for [%s]. Stopping.", groupId, ioConfig.getTaskDuration());
if (stopTasksEarly) {
log.info(
"Stopping task group [%d] early. It has run for [%s]",
groupId,
ioConfig.getTaskDuration()
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
stoppedTasks++;
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
if (pendingCompletionTaskGroups.values()
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
< ioConfig.getMaxAllowedStops()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,
ioConfig.getTaskDuration()
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
stoppedTasks.getAndIncrement();
}
}
}
}
}
}
});
List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> results = coalesceAndAwait(futures);
for (int j = 0; j < results.size(); j++) {
@ -3200,6 +3214,15 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
private DateTime computeEarliestTaskStartTime(TaskGroup group)
{
return group.tasks.values().stream()
.filter(taskData -> taskData.startTime != null)
.map(taskData -> taskData.startTime)
.min(DateTime::compareTo)
.orElse(DateTimes.nowUtc());
}
private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> checkpointTaskGroup(
final TaskGroup taskGroup,
final boolean finalize

View File

@ -49,8 +49,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Optional<DateTime> lateMessageRejectionStartDateTime;
@Nullable private final AutoScalerConfig autoScalerConfig;
@Nullable private final IdleConfig idleConfig;
private final int stopTaskCount;
@Nullable private final Integer stopTaskCount;
public SeekableStreamSupervisorIOConfig(
String stream,
@ -81,8 +80,9 @@ public abstract class SeekableStreamSupervisorIOConfig
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
this.stopTaskCount = stopTaskCount == null ? this.taskCount : stopTaskCount;
Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be greater than 0");
Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
"stopTaskCount must be greater than 0");
this.stopTaskCount = stopTaskCount;
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
@ -205,9 +205,15 @@ public abstract class SeekableStreamSupervisorIOConfig
return idleConfig;
}
@Nullable
@JsonProperty
public int getStopTaskCount()
public Integer getStopTaskCount()
{
return stopTaskCount;
}
public int getMaxAllowedStops()
{
return stopTaskCount == null ? taskCount : stopTaskCount;
}
}

View File

@ -979,6 +979,228 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}
@Test(timeout = 60_000L)
public void testEarlyStoppingOfTaskGroupBasedOnStopTaskCount() throws InterruptedException, JsonProcessingException
{
// Assuming tasks have surpassed their duration limit at test execution
DateTime startTime = DateTimes.nowUtc().minusHours(2);
// Configure supervisor to stop only one task at a time
int stopTaskCount = 1;
SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
1,
3,
new Period("PT1H"),
new Period("PT1S"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
null,
new IdleConfig(true, 200L),
stopTaskCount
)
{
};
EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()
{
@Override
public Duration getEmissionDuration()
{
return new Period("PT2S").toStandardDuration();
}
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();
TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));
Map<String, Object> context = new HashMap<>();
context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));
TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
"id1",
null,
getDataSchema(),
taskTuningConfig,
createTaskIoConfigExt(
0,
Collections.singletonMap("0", "10"),
Collections.singletonMap("0", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
),
context,
"0"
);
TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
"id2",
null,
getDataSchema(),
taskTuningConfig,
createTaskIoConfigExt(
1,
Collections.singletonMap("1", "10"),
Collections.singletonMap("1", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
),
context,
"1"
);
TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
"id3",
null,
getDataSchema(),
taskTuningConfig,
createTaskIoConfigExt(
2,
Collections.singletonMap("2", "10"),
Collections.singletonMap("2", "20"),
"test",
startTime,
null,
Collections.emptySet(),
ioConfig
),
context,
"2"
);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);
Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
.andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
.andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
.anyTimes();
// Mocking to return the earliest start time for task id2, indicating it's the first group to start
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2"))
.andReturn(Futures.immediateFuture(startTime)).anyTimes();
EasyMock.expect(indexTaskClient.getStartTimeAsync("id3"))
.andReturn(Futures.immediateFuture(startTime.plusSeconds(2)))
.anyTimes();
ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, partitionOffset);
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset, false))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id2"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.resumeAsync("id3"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id1"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
EasyMock.expect(indexTaskClient.pauseAsync("id3"))
.andReturn(Futures.immediateFuture(true))
.anyTimes();
// Expect the earliest-started task (id2) to transition to publishing first
taskQueue.shutdown("id2", "All tasks in group[%s] failed to transition to publishing state", 1);
replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
supervisor.runInternal();
supervisor.checkpoint(
0,
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
)
);
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
verifyAll();
Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}
@Test
public void testEmitBothLag() throws Exception
{