Handle nullable taskTypes for rolling upgrade (#5309)

This commit is contained in:
Jihoon Son 2018-01-30 13:32:54 -08:00 committed by Gian Merlino
parent 64ee65856e
commit 3a69b0e513
4 changed files with 23 additions and 3 deletions

View File

@ -39,7 +39,7 @@ public class TaskStatusPlus
@JsonCreator @JsonCreator
public TaskStatusPlus( public TaskStatusPlus(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("type") String type, @JsonProperty("type") @Nullable String type, // nullable for backward compatibility
@JsonProperty("createdTime") DateTime createdTime, @JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("state") @Nullable TaskState state, @JsonProperty("state") @Nullable TaskState state,
@ -65,6 +65,7 @@ public class TaskStatusPlus
return id; return id;
} }
@Nullable
@JsonProperty @JsonProperty
public String getType() public String getType()
{ {
@ -83,12 +84,14 @@ public class TaskStatusPlus
return queueInsertionTime; return queueInsertionTime;
} }
@Nullable
@JsonProperty @JsonProperty
public TaskState getState() public TaskState getState()
{ {
return state; return state;
} }
@Nullable
@JsonProperty @JsonProperty
public Long getDuration() public Long getDuration()
{ {

View File

@ -27,6 +27,8 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.DateTimes;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
/** /**
* A holder for a task and different components associated with the task * A holder for a task and different components associated with the task
*/ */
@ -85,6 +87,11 @@ public abstract class TaskRunnerWorkItem
} }
public abstract TaskLocation getLocation(); public abstract TaskLocation getLocation();
/**
* Returns the type of task. The return value can be null for backward compatibility.
*/
@Nullable
public abstract String getTaskType(); public abstract String getTaskType();
@Override @Override

View File

@ -517,7 +517,10 @@ public class OverlordResource
} else { } else {
workItems = taskRunner.getRunningTasks() workItems = taskRunner.getRunningTasks()
.stream() .stream()
.filter(workitem -> workitem.getTaskType().equals(taskType)) .filter(workitem -> {
final String itemType = workitem.getTaskType();
return itemType != null && itemType.equals(taskType);
})
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

View File

@ -72,7 +72,14 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
final int numRunningCompactTasks = indexingServiceClient final int numRunningCompactTasks = indexingServiceClient
.getRunningTasks() .getRunningTasks()
.stream() .stream()
.filter(status -> status.getType().equals(COMPACT_TASK_TYPE)) .filter(status -> {
final String taskType = status.getType();
// taskType can be null if middleManagers are running with an older version. Here, we consevatively regard
// the tasks of the unknown taskType as the compactionTask. This is because it's important to not run
// compactionTasks more than the configured limit at any time which might impact to the ingestion
// performance.
return taskType == null || taskType.equals(COMPACT_TASK_TYPE);
})
.collect(Collectors.toList()) .collect(Collectors.toList())
.size(); .size();
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources); final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);