Rolling Supervisor restarts at taskDuration (#14396)

* Rolling supervior task publishing

* add an option for number of task groups to roll over

* better

* remove docs

* oops

* checkstyle

* wip test

* undo partial test change

* remove incomplete test
This commit is contained in:
Suneet Saldanha 2023-08-07 16:24:32 -07:00 committed by GitHub
parent 14940dc3ed
commit b624a4ec4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 75 additions and 31 deletions

View File

@ -65,7 +65,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("stopTaskCount") Integer stopTaskCount
)
{
super(
@ -82,7 +83,8 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
idleConfig
idleConfig,
stopTaskCount
);
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");

View File

@ -178,6 +178,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
),
null,
@ -237,6 +238,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
),
null,
@ -338,6 +340,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
),
null,
@ -520,6 +523,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
),
null,
@ -574,6 +578,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
),
null,

View File

@ -307,6 +307,7 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
null,
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
@ -348,7 +349,8 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
mapper.convertValue(idleConfig, IdleConfig.class)
mapper.convertValue(idleConfig, IdleConfig.class),
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);

View File

@ -308,7 +308,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
new IdleConfig(true, 1000L)
new IdleConfig(true, 1000L),
1
);
final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig(
@ -4516,7 +4517,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
null,
null,
idleConfig
idleConfig,
null
);
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
@ -4627,6 +4629,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
null,
null,
null,
null
);
@ -4742,6 +4745,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
null,
null,
null,
null
);

View File

@ -93,7 +93,8 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
new IdleConfig(null, null)
new IdleConfig(null, null),
null
);
this.endpoint = endpoint != null

View File

@ -1574,6 +1574,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
}
earlyStopTime = DateTimes.EPOCH;
checkTaskDuration();
}
@ -2934,19 +2935,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>();
for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}
boolean stopTasksEarly = false;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) {
log.info("Early stop requested - signalling tasks to complete");
@ -2955,12 +2943,35 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
stopTasksEarly = true;
}
int stoppedTasks = 0;
for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();
// if this task has run longer than the configured duration, signal all tasks in the group to persist
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || stopTasksEarly) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
if (stopTasksEarly) {
log.info("Stopping task group [%d] early. It has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
} else {
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
if (pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() + stoppedTasks
< ioConfig.getStopTaskCount()) {
log.info("Task group [%d] has run for [%s]. Stopping.", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
stoppedTasks++;
}
}
}
}

View File

@ -50,6 +50,8 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable private final AutoScalerConfig autoScalerConfig;
@Nullable private final IdleConfig idleConfig;
private final int stopTaskCount;
public SeekableStreamSupervisorIOConfig(
String stream,
@Nullable InputFormat inputFormat,
@ -64,7 +66,8 @@ public abstract class SeekableStreamSupervisorIOConfig
Period earlyMessageRejectionPeriod,
@Nullable AutoScalerConfig autoScalerConfig,
DateTime lateMessageRejectionStartDateTime,
@Nullable IdleConfig idleConfig
@Nullable IdleConfig idleConfig,
@Nullable Integer stopTaskCount
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
@ -78,6 +81,8 @@ public abstract class SeekableStreamSupervisorIOConfig
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
this.stopTaskCount = stopTaskCount == null ? this.taskCount : stopTaskCount;
Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be greater than 0");
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
@ -199,4 +204,10 @@ public abstract class SeekableStreamSupervisorIOConfig
{
return idleConfig;
}
@JsonProperty
public int getStopTaskCount()
{
return stopTaskCount;
}
}

View File

@ -347,7 +347,8 @@ public class SeekableStreamSamplerSpecTest extends EasyMockSupport
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
idleConfig
idleConfig,
null
);
}
}

View File

@ -865,6 +865,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
null,
null,
null,
null,
null
)
{
@ -919,7 +920,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
null,
null,
null,
new IdleConfig(true, null)
new IdleConfig(true, null),
null
)
{
};
@ -1085,6 +1087,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
null,
mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class),
null,
null,
null
)
{
@ -1104,6 +1107,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
null,
mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
null,
null,
null
)
{

View File

@ -492,7 +492,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
null,
null,
new IdleConfig(true, 200L)
new IdleConfig(true, 200L),
null
)
{
}).anyTimes();
@ -1088,6 +1089,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
null,
null,
null,
null
)
{
@ -1148,6 +1150,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
null,
null,
null
)
{