Add persistent task assignment explanations.

This commit allows persistent actions to indicate why a task was or wasn't assigned to a certain node.

Original commit: elastic/x-pack-elasticsearch@cdacf9b10f
This commit is contained in:
Igor Motov 2017-02-21 21:05:50 -05:00
parent d87926ab86
commit db88cc458d
13 changed files with 335 additions and 154 deletions

View File

@ -56,6 +56,7 @@ import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
@ -306,8 +307,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
@Override
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
public Assignment getAssignment(Request request, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
// TODO: Add proper explanation
if (discoveryNode == null) {
return NO_NODE_FOUND;
} else {
return new Assignment(discoveryNode.getId(), "");
}
}
@Override
@ -360,7 +367,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
PersistentTaskInProgress<?> task = MlMetadata.getJobTask(jobId, tasks);
JobState jobState = MlMetadata.getJobState(jobId, tasks);
if (task != null && jobState == JobState.OPENED) {
if (task.getExecutorNode() == null) {
if (task.isAssigned() == false) {
// We can skip the job state check below, because the task got unassigned after we went into
// opened state on a node that disappeared and we didn't have the opportunity to set the status to failed
return;

View File

@ -53,6 +53,7 @@ import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.TransportPersistentAction;
@ -313,8 +314,14 @@ public class StartDatafeedAction
}
@Override
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectNode(logger, request, clusterState);
public Assignment getAssignment(Request request, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectNode(logger, request, clusterState);
// TODO: Add proper explanation
if (discoveryNode == null) {
return NO_NODE_FOUND;
} else {
return new Assignment(discoveryNode.getId(), "");
}
}
@Override
@ -359,7 +366,7 @@ public class StartDatafeedAction
PersistentTaskInProgress<?> datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks);
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks);
if (datafeedTask != null && datafeedState == DatafeedState.STARTED) {
if (datafeedTask.getExecutorNode() == null) {
if (datafeedTask.isAssigned() == false) {
// We can skip the datafeed state check below, because the task got unassigned after we went into
// started state on a node that disappeared and we didn't have the opportunity to set the status to stopped
return;

View File

@ -65,7 +65,7 @@ public abstract class TransportJobTaskAction<OperationTask extends Task, Request
JobManager.getJobOrThrowIfUnknown(state, jobId);
PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress.PersistentTaskInProgress<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
if (jobTask == null || jobTask.getExecutorNode() == null) {
if (jobTask == null || jobTask.isAssigned() == false) {
listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED +
"], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT));
} else {

View File

@ -20,11 +20,10 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Component that runs only on the master node and is responsible for assigning running tasks to nodes
@ -55,13 +54,13 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final String executorNodeId;
final Assignment assignment;
if (stopped) {
executorNodeId = null; // the task is stopped no need to assign it anywhere yet
assignment = PersistentTasksInProgress.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere
} else {
executorNodeId = executorNode(action, currentState, request);
assignment = getAssignement(action, currentState, request);
}
return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, executorNodeId));
return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, assignment));
}
@Override
@ -100,7 +99,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
if (tasksInProgress.hasTask(id)) {
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
tasksInProgress.reassignTask(id, (action, request) -> executorNode(action, currentState, request));
tasksInProgress.reassignTask(id, (action, request) -> getAssignement(action, currentState, request));
} else {
tasksInProgress.finishTask(id);
}
@ -137,7 +136,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id)) {
return update(currentState, tasksInProgress
.assignTask(id, (action, request) -> executorNode(action, currentState, request)));
.assignTask(id, (action, request) -> getAssignement(action, currentState, request)));
} else {
throw new ResourceNotFoundException("the task with id {} doesn't exist", id);
}
@ -216,26 +215,17 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
});
}
private <Request extends PersistentActionRequest> String executorNode(String action, ClusterState currentState, Request request) {
private <Request extends PersistentActionRequest> Assignment getAssignement(String action, ClusterState currentState, Request request) {
TransportPersistentAction<Request> persistentAction = registry.getPersistentActionSafe(action);
persistentAction.validate(request, currentState);
DiscoveryNode executorNode = persistentAction.executorNode(request, currentState);
final String executorNodeId;
if (executorNode == null) {
// The executor node not available yet, we will create task with empty executor node and try
// again later
executorNodeId = null;
} else {
executorNodeId = executorNode.getId();
}
return executorNodeId;
return persistentAction.getAssignment(request, currentState);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
logger.trace("checking task reassignment for cluster state {}", event.state().getVersion());
if (reassignmentRequired(event, this::executorNode)) {
if (reassignmentRequired(event, this::getAssignement)) {
logger.trace("task reassignment is needed");
reassignTasks();
} else {
@ -245,7 +235,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
}
interface ExecutorNodeDecider {
<Request extends PersistentActionRequest> String executorNode(String action, ClusterState currentState, Request request);
<Request extends PersistentActionRequest> Assignment getAssignment(String action, ClusterState currentState, Request request);
}
static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
@ -257,18 +247,16 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
event.previousState().nodes().isLocalNodeElectedMaster() == false)) {
// We need to check if removed nodes were running any of the tasks and reassign them
boolean reassignmentRequired = false;
Set<String> removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
if (taskInProgress.isStopped() == false) { // skipping stopped tasks
if (taskInProgress.getExecutorNode() == null || removedNodes.contains(taskInProgress.getExecutorNode())) {
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getRequest(),
decider.executorNode(taskInProgress.getAction(), event.state(), taskInProgress.getRequest())) == false) {
if (Objects.equals(taskInProgress.getAssignment(),
decider.getAssignment(taskInProgress.getAction(), event.state(), taskInProgress.getRequest())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
}
}
}
}
return reassignmentRequired;
@ -283,7 +271,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return reassignTasks(currentState, logger, PersistentTaskClusterService.this::executorNode);
return reassignTasks(currentState, logger, PersistentTaskClusterService.this::getAssignement);
}
@Override
@ -306,16 +294,15 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
logger.trace("reassigning {} persistent tasks", tasks.tasks().size());
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.isStopped() == false &&
(task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false)) {
if (task.needsReassignment(nodes)) {
// there is an unassigned task - we need to try assigning it
String executorNode = decider.executorNode(task.getAction(), clusterState, task.getRequest());
if (Objects.equals(executorNode, task.getExecutorNode()) == false) {
Assignment assignment = decider.getAssignment(task.getAction(), clusterState, task.getRequest());
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getExecutorNode(), executorNode);
clusterState = update(clusterState, builder(clusterState).reassignTask(task.getId(), executorNode));
task.getAssignment().getExecutorNode(), assignment.getExecutorNode());
clusterState = update(clusterState, builder(clusterState).reassignTask(task.getId(), assignment));
} else {
logger.trace("ignoring task {} because executor nodes are the same {}", task.getId(), executorNode);
logger.trace("ignoring task {} because assignment is the same {}", task.getId(), assignment);
}
} else {
if (task.isStopped()) {

View File

@ -9,12 +9,14 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.NamedObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
@ -36,6 +38,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.MetaData.ALL_CONTEXTS;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
/**
* A cluster state record that contains a list of all running persistent tasks
@ -61,6 +64,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
public static final ObjectParser<TaskBuilder<PersistentActionRequest>, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER =
new ObjectParser<>("running_tasks", TaskBuilder::new);
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));
public static final NamedObjectParser<ActionDescriptionBuilder<PersistentActionRequest>, Void> ACTION_PARSER;
static {
@ -77,6 +83,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
(p, c) -> p.namedObject(Status.class, c, null), new ParseField("status"));
ACTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new ActionDescriptionBuilder<>(name), name);
// Assignment parser
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
// Task parser initialization
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
@ -92,7 +102,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
taskBuilder.setRequest(builder.request);
taskBuilder.setStatus(builder.status);
}, ACTION_PARSER, new ParseField("action"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareStringOrNull(TaskBuilder::setExecutorNode, new ParseField("executor_node"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
new ParseField("allocation_id_on_last_status_update"));
}
@ -165,7 +175,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
}
public long getNumberOfTasksOnNode(String nodeId, String action) {
return tasks.values().stream().filter(task -> action.equals(task.action) && nodeId.equals(task.executorNode)).count();
return tasks.values().stream().filter(task -> action.equals(task.action) && nodeId.equals(task.assignment.executorNode)).count();
}
@Override
@ -182,6 +192,54 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return PERSISTENT_TASKS_IN_PROGRESS_PARSER.parse(parser, null).build();
}
public static class Assignment {
@Nullable
private final String executorNode;
private final String explanation;
public Assignment(String executorNode, String explanation) {
this.executorNode = executorNode;
assert explanation != null;
this.explanation = explanation;
}
@Nullable
public String getExecutorNode() {
return executorNode;
}
public String getExplanation() {
return explanation;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Assignment that = (Assignment) o;
return Objects.equals(executorNode, that.executorNode) &&
Objects.equals(explanation, that.explanation);
}
@Override
public int hashCode() {
return Objects.hash(executorNode, explanation);
}
public boolean isAssigned() {
return executorNode != null;
}
@Override
public String toString() {
return "node: [" + executorNode + "], explanation: [" + explanation +"]";
}
}
public static final Assignment INITIAL_ASSIGNMENT = new Assignment(null, "waiting for initial assignment");
public static final Assignment FINISHED_TASK_ASSIGNMENT = new Assignment(null, "task has finished");
/**
* A record that represents a single running persistent task
*/
@ -194,30 +252,29 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private final boolean removeOnCompletion;
@Nullable
private final Status status;
@Nullable
private final String executorNode;
private final Assignment assignment;
@Nullable
private final Long allocationIdOnLastStatusUpdate;
public PersistentTaskInProgress(long id, String action, Request request, boolean stopped, boolean removeOnCompletion,
String executorNode) {
this(id, 0L, action, request, stopped, removeOnCompletion, null, executorNode, null);
Assignment assignment) {
this(id, 0L, action, request, stopped, removeOnCompletion, null, assignment, null);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, String newExecutorNode) {
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status,
newExecutorNode, task.allocationId);
assignment, task.allocationId);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, Status status) {
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status,
task.executorNode, task.allocationId);
task.assignment, task.allocationId);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request,
boolean stopped, boolean removeOnCompletion, Status status,
String executorNode, Long allocationIdOnLastStatusUpdate) {
Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
@ -225,7 +282,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
this.status = status;
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.executorNode = executorNode;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
@ -240,7 +297,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
executorNode = in.readOptionalString();
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
}
@ -253,7 +310,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(executorNode);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
out.writeOptionalLong(allocationIdOnLastStatusUpdate);
}
@ -269,13 +327,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(executorNode, that.executorNode) &&
Objects.equals(assignment, that.assignment) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode,
return Objects.hash(id, allocationId, action, request, stopped, removeOnCompletion, status, assignment,
allocationIdOnLastStatusUpdate);
}
@ -302,7 +360,22 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
@Nullable
public String getExecutorNode() {
return executorNode;
return assignment.executorNode;
}
public Assignment getAssignment() {
return assignment;
}
public boolean isAssigned() {
return assignment.isAssigned();
}
/**
* Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node.
*/
public boolean needsReassignment(DiscoveryNodes nodes) {
return isStopped() == false && (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}
@Nullable
@ -347,7 +420,12 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
if (API_CONTEXT.equals(params.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
builder.field("allocation_id", allocationId);
builder.field("executor_node", executorNode);
builder.startObject("assignment");
{
builder.field("executor_node", assignment.executorNode);
builder.field("explanation", assignment.explanation);
}
builder.endObject();
if (allocationIdOnLastStatusUpdate != null) {
builder.field("allocation_id_on_last_status_update", allocationIdOnLastStatusUpdate);
}
@ -373,7 +451,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
private boolean stopped = true;
private boolean removeOnCompletion;
private Status status;
private String executorNode;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(long id) {
@ -412,8 +490,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
return this;
}
public TaskBuilder<Request> setExecutorNode(String executorNode) {
this.executorNode = executorNode;
public TaskBuilder<Request> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
}
@ -424,7 +502,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
public PersistentTaskInProgress<Request> build() {
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
executorNode, allocationIdOnLastStatusUpdate);
assignment, allocationIdOnLastStatusUpdate);
}
}
@ -510,22 +588,21 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
* After the task is added its id can be found by calling {{@link #getCurrentId()}} method.
*/
public <Request extends PersistentActionRequest> Builder addTask(String action, Request request, boolean stopped,
boolean removeOnCompletion, String executorNode) {
boolean removeOnCompletion, Assignment assignment) {
changed = true;
currentId++;
tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion,
executorNode));
tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion, assignment));
return this;
}
/**
* Reassigns the task to another node if the task exist
*/
public Builder reassignTask(long taskId, String executorNode) {
public Builder reassignTask(long taskId, Assignment assignment) {
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode));
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
}
return this;
}
@ -538,13 +615,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentActionRequest> Builder assignTask(long taskId,
BiFunction<String, Request, String> executorNodeFunc) {
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.getExecutorNode() == null) { // only assign unassigned tasks
String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
if (executorNode != null || taskInProgress.isStopped()) {
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
if (assignment.isAssigned() || taskInProgress.isStopped()) {
changed = true;
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode));
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
}
}
return this;
@ -555,12 +632,12 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentActionRequest> Builder reassignTask(long taskId,
BiFunction<String, Request, String> executorNodeFunc) {
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode));
Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
}
return this;
}
@ -599,7 +676,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
if (taskInProgress.removeOnCompletion) {
tasks.remove(taskId);
} else {
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, null));
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
}
}
return this;

View File

@ -16,6 +16,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -42,14 +43,22 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
persistentActionRegistry.registerPersistentAction(actionName, this);
}
public static final Assignment NO_NODE_FOUND = new Assignment(null, "no appropriate nodes found for the assignment");
/**
* Returns the node id where the request has to be executed,
* <p>
* The default implementation returns the least loaded data node
*/
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode);
public Assignment getAssignment(Request request, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode);
if (discoveryNode == null) {
return NO_NODE_FOUND;
} else {
return new Assignment(discoveryNode.getId(), "");
}
}
/**
* Finds the least loaded node that satisfies the selector criteria

View File

@ -35,6 +35,7 @@ import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ -269,7 +270,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTaskInProgress<StartDatafeedAction.Request> taskInProgress =
new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, null);
new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
PersistentTasksInProgress tasksInProgress =
new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
@ -331,7 +332,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTaskInProgress<StartDatafeedAction.Request> taskInProgress =
new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, null);
new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
PersistentTasksInProgress tasksInProgress =
new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));

View File

@ -33,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.net.InetAddress;
@ -122,12 +123,12 @@ public class OpenJobActionTests extends ESTestCase {
.build();
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L,
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, "_node_id1"));
taskMap.put(1L,
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true, "_node_id1"));
taskMap.put(2L,
new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true, "_node_id2"));
taskMap.put(0L, new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
new Assignment("_node_id1", "test assignment")));
taskMap.put(1L, new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true,
new Assignment("_node_id1", "test assignment")));
taskMap.put(2L, new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true,
new Assignment("_node_id2", "test assignment")));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
@ -157,7 +158,7 @@ public class OpenJobActionTests extends ESTestCase {
for (int j = 0; j < maxRunningJobsPerNode; j++) {
long id = j + (maxRunningJobsPerNode * i);
taskMap.put(id, new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request("job_id" + id),
false, true, nodeId));
false, true, new Assignment(nodeId, "test assignment")));
}
}
PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap);
@ -183,7 +184,8 @@ public class OpenJobActionTests extends ESTestCase {
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, "_node_id1");
new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
new Assignment("_node_id1", "test assignment"));
PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
@ -241,7 +243,7 @@ public class OpenJobActionTests extends ESTestCase {
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because OPENING state", result);
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3"));
taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, new Assignment("_node_id3", "test assignment")));
tasks = new PersistentTasksInProgress(6L, taskMap);
csBuilder = ClusterState.builder(cs);
@ -295,7 +297,8 @@ public class OpenJobActionTests extends ESTestCase {
public static PersistentTaskInProgress<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) {
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId);
new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true,
new Assignment(nodeId, "test assignment"));
task = new PersistentTaskInProgress<>(task, jobState);
return task;
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import java.net.InetAddress;
@ -32,6 +33,7 @@ import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.equalTo;
public class StartDatafeedActionTests extends ESTestCase {
@ -86,7 +88,7 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false)
.build();
PersistentTaskInProgress<OpenJobAction.Request> task =
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), false, true, null);
new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), false, true, INITIAL_ASSIGNMENT);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task));
DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
@ -112,7 +114,7 @@ public class StartDatafeedActionTests extends ESTestCase {
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, "node_id");
false, true, new Assignment("node_id", "test assignment"));
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);
@ -139,7 +141,7 @@ public class StartDatafeedActionTests extends ESTestCase {
PersistentTaskInProgress<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED);
PersistentTaskInProgress<StartDatafeedAction.Request> datafeedTask =
new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, "node_id1");
false, true, new Assignment("node_id1", "test assignment"));
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTaskInProgress<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask);
@ -148,7 +150,7 @@ public class StartDatafeedActionTests extends ESTestCase {
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);
datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
false, true, null);
false, true, INITIAL_ASSIGNMENT);
datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED);
taskMap.put(1L, datafeedTask);
StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Response;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
import java.io.IOException;
@ -75,10 +76,12 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.addTask("test_action", new TestRequest("other_" + i), false, true, "other_node_" + randomInt(nonLocalNodesCount));
tasks.addTask("test_action", new TestRequest("other_" + i), false, true,
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
if (added == false && randomBoolean()) {
added = true;
tasks.addTask("test", new TestRequest("this_param"), false, true, "this_node");
tasks.addTask("test", new TestRequest("this_param"), false, true,
new Assignment("this_node", "test assignment on this node"));
}
}
}
@ -288,7 +291,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
PersistentTasksInProgress.Builder builder =
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
builder.addTask(action, request, false, true, node).build())).build();
builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
@ -296,7 +299,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
assertTrue(builder.hasTask(taskId));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
builder.reassignTask(taskId, node).build())).build();
builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState removeTask(ClusterState state, long taskId) {

View File

@ -18,6 +18,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
@ -28,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.persistent.TransportPersistentAction.NO_NODE_FOUND;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
@ -36,7 +38,7 @@ import static org.hamcrest.Matchers.nullValue;
public class PersistentTaskClusterServiceTests extends ESTestCase {
public void testReassignmentRequired() {
int numberOfIterations = randomIntBetween(10, 100);
int numberOfIterations = randomIntBetween(1, 30);
ClusterState clusterState = initialState();
for (int i = 0; i < numberOfIterations; i++) {
boolean significant = randomBoolean();
@ -48,14 +50,17 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
clusterState = insignificantChange(clusterState);
}
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState);
assertThat(dumpEvent(event), significant, equalTo(PersistentTaskClusterService.reassignmentRequired(event,
assertThat(dumpEvent(event), PersistentTaskClusterService.reassignmentRequired(event,
new PersistentTaskClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentActionRequest> String executorNode(
public <Request extends PersistentActionRequest> Assignment getAssignment(
String action, ClusterState currentState, Request request) {
return randomNode(currentState.nodes());
if ("never_assign".equals(((TestRequest) request).getTestParam())) {
return NO_NODE_FOUND;
}
})));
return randomNodeAssignment(currentState.nodes());
}
}), equalTo(significant));
}
}
@ -128,23 +133,32 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
for (PersistentTaskInProgress<?> task : tasksInProgress.tasks()) {
if (task.isStopped()) {
assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue());
assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getAction()));
} else {
// explanation should correspond to the action name
switch (task.getAction()) {
case "should_assign":
assertThat(task.getExecutorNode(), notNullValue());
assertThat(task.isAssigned(), equalTo(true));
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString());
}
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
break;
case "should_not_assign":
assertThat(task.getExecutorNode(), nullValue());
assertThat(task.isAssigned(), equalTo(false));
assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment"));
break;
case "assign_one":
if (task.getExecutorNode() != null) {
if (task.isAssigned()) {
assignOneCount++;
assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1));
assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
} else {
assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time"));
}
break;
default:
@ -165,14 +179,14 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
return PersistentTaskClusterService.reassignTasks(clusterState, logger,
new PersistentTaskClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentActionRequest> String executorNode(
public <Request extends PersistentActionRequest> Assignment getAssignment(
String action, ClusterState currentState, Request request) {
TestRequest testRequest = (TestRequest) request;
switch (testRequest.getTestParam()) {
case "assign_me":
return randomNode(currentState.nodes());
return randomNodeAssignment(currentState.nodes());
case "dont_assign_me":
return null;
return NO_NODE_FOUND;
case "fail_me_if_called":
fail("the decision decider shouldn't be called on this task");
return null;
@ -181,33 +195,37 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
default:
fail("unknown param " + testRequest.getTestParam());
}
return null;
return NO_NODE_FOUND;
}
});
}
private String assignOnlyOneTaskAtATime(ClusterState clusterState) {
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
DiscoveryNodes nodes = clusterState.nodes();
PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress.findTasks("assign_one",
task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
return randomNode(clusterState.nodes());
return randomNodeAssignment(clusterState.nodes());
} else {
return null;
return new Assignment(null, "only one task can be assigned at a time");
}
}
private String randomNode(DiscoveryNodes nodes) {
private Assignment randomNodeAssignment(DiscoveryNodes nodes) {
if (nodes.getNodes().isEmpty()) {
return null;
return NO_NODE_FOUND;
}
List<String> nodeList = new ArrayList<>();
for (ObjectCursor<String> node : nodes.getNodes().keys()) {
nodeList.add(node.value);
}
return randomFrom(nodeList);
String node = randomFrom(nodeList);
if (node != null) {
return new Assignment(node, "test assignment");
} else {
return NO_NODE_FOUND;
}
}
private String dumpEvent(ClusterChangedEvent event) {
@ -222,10 +240,8 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
if (tasks != null) {
if (randomBoolean()) {
//
boolean removedNode = false;
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.getExecutorNode() != null && clusterState.nodes().nodeExists(task.getExecutorNode())) {
if (task.isAssigned() && clusterState.nodes().nodeExists(task.getExecutorNode())) {
logger.info("removed node {}", task.getExecutorNode());
builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode()));
return builder.build();
@ -235,11 +251,18 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
}
boolean tasksOrNodesChanged = false;
// add a new unassigned task
if (hasUnassigned(tasks, clusterState.nodes()) == false) {
if (hasAssignableTasks(tasks, clusterState.nodes()) == false) {
// we don't have any unassigned tasks - add some
if (randomBoolean()) {
logger.info("added random task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false);
tasksOrNodesChanged = true;
} else {
logger.info("added unassignable task with custom assignment message");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks),
new Assignment(null, "change me"), "never_assign", false);
tasksOrNodesChanged = true;
}
}
// add a node if there are unassigned tasks
if (clusterState.nodes().getNodes().isEmpty()) {
@ -259,31 +282,60 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
return builder.build();
}
private PersistentTasksInProgress removeTasksWithChangingAssignment(PersistentTasksInProgress tasks) {
if (tasks != null) {
boolean changed = false;
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
// Remove all unassigned tasks that cause changing assignments they might trigger a significant change
if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) &&
"change me".equals(task.getAssignment().getExplanation())) {
logger.info("removed task with changing assignment {}", task.getId());
tasksBuilder.removeTask(task.getId());
changed = true;
}
}
if (changed) {
return tasksBuilder.build();
}
}
return tasks;
}
private ClusterState insignificantChange(ClusterState clusterState) {
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
tasks = removeTasksWithChangingAssignment(tasks);
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
if (randomBoolean()) {
if (hasUnassigned(tasks, clusterState.nodes()) == false) {
if (hasAssignableTasks(tasks, clusterState.nodes()) == false) {
// we don't have any unassigned tasks - adding a node or changing a routing table shouldn't affect anything
if (randomBoolean()) {
logger.info("added random node");
builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10))));
}
if (randomBoolean()) {
logger.info("added random unassignable task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign", false);
return builder.build();
}
if (randomBoolean()) {
// add unassigned task in stopped state
logger.info("added random stopped task");
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, true);
addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, null, true);
return builder.build();
} else {
logger.info("changed routing table");
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
metaData.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build());
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
changeRoutingTable(metaData, routingTable);
builder.metaData(metaData).routingTable(routingTable.build());
}
return builder.build();
}
}
}
if (randomBoolean()) {
// remove a node that doesn't have any tasks assigned to it and it's not the master node
for (DiscoveryNode node : clusterState.nodes()) {
@ -308,10 +360,9 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
}
}
logger.info("removed all unassigned tasks and changed routing table");
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
if (tasks != null) {
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null) {
if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) {
tasksBuilder.removeTask(task.getId());
}
}
@ -327,13 +378,19 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
return builder.metaData(metaData).build();
}
private boolean hasUnassigned(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) {
private boolean hasAssignableTasks(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) {
if (tasks == null || tasks.tasks().isEmpty()) {
return false;
}
return tasks.tasks().stream().anyMatch(task ->
task.isStopped() == false &&
(task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())));
return tasks.tasks().stream().anyMatch(task -> {
if (task.isStopped()) {
return false;
}
if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) {
return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false;
}
return false;
});
}
private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) {
@ -343,14 +400,20 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks,
String node,
boolean stopped) {
String node, boolean stopped) {
return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)),
randomAsciiOfLength(10), stopped);
}
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks,
Assignment assignment, String param, boolean stopped) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE,
tasks.addTask(randomAsciiOfLength(10), new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), node).build()));
tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build()));
}
private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) {
tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), node);
tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action));
}
private DiscoveryNode newNode(String nodeId) {

View File

@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractDiffableSerializationTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Builder;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.Status;
@ -34,6 +35,7 @@ import java.util.Collections;
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY;
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT;
import static org.elasticsearch.xpack.persistent.TransportPersistentAction.NO_NODE_FOUND;
public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase<Custom> {
@ -42,8 +44,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
int numberOfTasks = randomInt(10);
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder();
for (int i = 0; i < numberOfTasks; i++) {
boolean stopped = randomBoolean();
tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)),
randomBoolean(), randomBoolean(), randomAsciiOfLength(10));
stopped, randomBoolean(), stopped ? new Assignment(null, "stopped") : randomAssignment());
if (randomBoolean()) {
// From time to time update status
tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10)));
@ -79,7 +82,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
if (tasksInProgress.tasks().isEmpty()) {
addRandomTask(builder);
} else {
builder.reassignTask(pickRandomTask(tasksInProgress), randomAsciiOfLength(10));
builder.reassignTask(pickRandomTask(tasksInProgress), randomAssignment());
}
break;
case 2:
@ -133,8 +136,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
}
private Builder addRandomTask(Builder builder) {
builder.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)),
randomBoolean(), randomBoolean(), randomAsciiOfLength(10));
boolean stopped = randomBoolean();
builder.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(),
stopped ? new Assignment(null, "stopped") : randomAssignment());
return builder;
}
@ -210,9 +214,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
changed = true;
}
if (randomBoolean()) {
builder.reassignTask(lastKnownTask, randomAsciiOfLength(10));
builder.reassignTask(lastKnownTask, randomAssignment());
} else {
builder.reassignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10));
builder.reassignTask(lastKnownTask, (s, request) -> randomAssignment());
}
break;
case 2:
@ -220,22 +224,23 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
PersistentTaskInProgress<?> task = builder.build().getTask(lastKnownTask);
if (randomBoolean()) {
// Trying to reassign to the same node
builder.assignTask(lastKnownTask, (s, request) -> task.getExecutorNode());
builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment());
// should change if the task was stopped AND unassigned
if (task.getExecutorNode() == null && task.isStopped()) {
changed = true;
}
} else {
// Trying to reassign to a different node
builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10));
// should change if the task was unassigned
if (task.getExecutorNode() == null) {
Assignment randomAssignment = randomAssignment();
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment);
// should change if the task was unassigned and was reassigned to a different node or started
if ((task.isAssigned() == false && randomAssignment.isAssigned()) || task.isStopped()) {
changed = true;
}
}
} else {
// task doesn't exist - shouldn't change
builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10));
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment());
}
break;
case 3:
@ -264,4 +269,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
}
private Assignment randomAssignment() {
if (randomBoolean()) {
if (randomBoolean()) {
return NO_NODE_FOUND;
} else {
return new Assignment(null, randomAsciiOfLength(10));
}
}
return new Assignment(randomAsciiOfLength(10), randomAsciiOfLength(10));
}
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment;
import java.io.IOException;
import java.util.ArrayList;
@ -351,12 +352,18 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
}
@Override
public DiscoveryNode executorNode(TestRequest request, ClusterState clusterState) {
public Assignment getAssignment(TestRequest request, ClusterState clusterState) {
if (request.getExecutorNodeAttr() == null) {
return super.executorNode(request, clusterState);
return super.getAssignment(request, clusterState);
} else {
return selectLeastLoadedNode(clusterState,
DiscoveryNode executorNode = selectLeastLoadedNode(clusterState,
discoveryNode -> request.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr")));
if (executorNode != null) {
return new Assignment(executorNode.getId(), "test assignment");
} else {
return NO_NODE_FOUND;
}
}
}