In order to keep track of restarted tasks, `allocationIdOnLastStatusUpdate` field was added to `PersistentTaskInProgress` class.

This will allow persistent task implementors to detect whether the executor node has changed or has been unset since the last status update has occured.
This commit is contained in:
Martijn van Groningen 2017-02-17 14:16:33 +01:00
parent 16e661c34b
commit 479429c6ef
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
2 changed files with 37 additions and 7 deletions

View File

@ -70,4 +70,5 @@ public class PersistentTask extends CancellableTask {
public void setPersistentTaskId(long persistentTaskId) {
this.persistentTaskId = persistentTaskId;
}
}

View File

@ -106,6 +106,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
taskBuilder.setStatus(builder.status);
}, ACTION_PARSER, new ParseField("action"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareStringOrNull(TaskBuilder::setExecutorNode, new ParseField("executor_node"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update"));
}
/**
@ -207,24 +209,28 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private final Status status;
@Nullable
private final String executorNode;
@Nullable
private final Long allocationIdOnLastStatusUpdate;
public PersistentTaskInProgress(long id, String action, Request request, boolean stopped, boolean removeOnCompletion,
String executorNode) {
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode);
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode, null);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, String newExecutorNode) {
this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status,
newExecutorNode);
newExecutorNode, task.allocationId);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, Status status) {
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode);
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status,
task.executorNode, task.allocationId);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request,
boolean stopped, boolean removeOnCompletion, Status status, String executorNode) {
boolean stopped, boolean removeOnCompletion, Status status,
String executorNode, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
@ -233,6 +239,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.executorNode = executorNode;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
}
@ -247,6 +254,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
executorNode = in.readOptionalString();
allocationIdOnLastStatusUpdate = in.readOptionalLong();
}
@Override
@ -259,6 +267,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(executorNode);
out.writeOptionalLong(allocationIdOnLastStatusUpdate);
}
@Override
@ -273,12 +282,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(executorNode, that.executorNode);
Objects.equals(executorNode, that.executorNode) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode,
allocationIdOnLastStatusUpdate);
}
@Override
@ -320,6 +331,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return removeOnCompletion;
}
/**
* @return Whether the task status isn't stale. When a task gets unassigned from the executor node or assigned
* to a new executor node and the status hasn't been updated then the task status is stale.
*/
public boolean isCurrentStatus() {
return allocationIdOnLastStatusUpdate == allocationId;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -342,6 +361,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
builder.field("allocation_id", allocationId);
builder.field("executor_node", executorNode);
if (allocationIdOnLastStatusUpdate != null) {
builder.field("allocation_id_on_last_status_update", allocationIdOnLastStatusUpdate);
}
}
builder.field("stopped", stopped);
builder.field("remove_on_completion", removeOnCompletion);
@ -365,6 +387,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private boolean removeOnCompletion;
private Status status;
private String executorNode;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(long id) {
this.id = id;
@ -407,8 +430,14 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return this;
}
public TaskBuilder<Request> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
return this;
}
public PersistentTaskInProgress<Request> build() {
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode);
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
executorNode, allocationIdOnLastStatusUpdate);
}
}