Cleanup logic from handoff API (#16457)

* Cleanup logic from handoff API

* Fix test

* Fix checkstyle

* Update docs
This commit is contained in:
George Shiqi Wu 2024-05-16 08:42:44 -07:00 committed by GitHub
parent 435b58f101
commit ed9881df88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 18 additions and 26 deletions

View File

@ -3594,17 +3594,7 @@ Content-Type: application/json
<details>
<summary>View the response</summary>
```json
{
"id": "social_media",
"taskGroupIds": [
1,
2,
3
]
}
```
(empty response)
</details>
### Shut down a supervisor

View File

@ -422,7 +422,7 @@ public class SupervisorResource
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
return Response.ok(ImmutableMap.of("id", id, "taskGroupIds", taskGroupIds)).build();
return Response.ok().build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("Supervisor was not found [%s]", id)))

View File

@ -203,7 +203,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final String baseSequenceName;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
boolean shutdownEarly = false; // set by SupervisorManager.stopTaskGroupEarly
boolean handoffEarly = false; // set by SupervisorManager.stopTaskGroupEarly
TaskGroup(
int groupId,
@ -268,14 +268,14 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return tasks.keySet();
}
void setShutdownEarly()
void setHandoffEarly()
{
shutdownEarly = true;
handoffEarly = true;
}
Boolean getShutdownEarly()
Boolean getHandoffEarly()
{
return shutdownEarly;
return handoffEarly;
}
@VisibleForTesting
@ -690,8 +690,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
log.info("Tried to stop task group [%d] for supervisor [%s] that wasn't actively reading.", taskGroupId, supervisorId);
continue;
}
taskGroup.setShutdownEarly();
log.info("Task group [%d] for supervisor [%s] will handoff early.", taskGroupId, supervisorId);
taskGroup.setHandoffEarly();
}
}
@ -3194,7 +3194,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} else {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getShutdownEarly()) {
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getHandoffEarly()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
// If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action.
@ -3202,7 +3202,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.stream()
.mapToInt(CopyOnWriteArrayList::size)
.sum() + stoppedTasks.get()
< ioConfig.getMaxAllowedStops() || group.getShutdownEarly()) {
< ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) {
log.info(
"Task group [%d] has run for [%s]. Stopping.",
groupId,

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
@ -95,9 +94,13 @@ public interface Supervisor
int getActiveTaskGroupsCount();
/** Handoff the task group with id=taskGroupId the next time the supervisor runs regardless of task run time*/
/**
* Marks the given task groups as ready for segment hand-off irrespective of the task run times.
* In the subsequent run, the supervisor initiates segment publish and hand-off for these task groups and rolls over their tasks.
* taskGroupIds that are not valid or not actively reading are simply ignored.
*/
default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
throw new NotImplementedException("Supervisor does not have the feature to handoff task groups early implemented");
throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented");
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
@ -97,7 +96,7 @@ public class NoopSupervisorSpecTest
{
NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
Supervisor noOpSupervisor = expectedSpec.createSupervisor();
Assert.assertThrows(NotImplementedException.class,
Assert.assertThrows(UnsupportedOperationException.class,
() -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
);
}