diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 3f01f6aa3b0..f9a976ca7ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionService; import org.elasticsearch.xpack.persistent.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.TransportPersistentAction; @@ -306,8 +307,14 @@ public class OpenJobAction extends Action task = MlMetadata.getJobTask(jobId, tasks); JobState jobState = MlMetadata.getJobState(jobId, tasks); if (task != null && jobState == JobState.OPENED) { - if (task.getExecutorNode() == null) { + if (task.isAssigned() == false) { // We can skip the job state check below, because the task got unassigned after we went into // opened state on a node that disappeared and we didn't have the opportunity to set the status to failed return; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index ac3e848ff5d..2aca79346d5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -53,6 +53,7 @@ import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionService; import org.elasticsearch.xpack.persistent.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.TransportPersistentAction; @@ -313,8 +314,14 @@ public class StartDatafeedAction } @Override - public DiscoveryNode executorNode(Request request, ClusterState clusterState) { - return selectNode(logger, request, clusterState); + public Assignment getAssignment(Request request, ClusterState clusterState) { + DiscoveryNode discoveryNode = selectNode(logger, request, clusterState); + // TODO: Add proper explanation + if (discoveryNode == null) { + return NO_NODE_FOUND; + } else { + return new Assignment(discoveryNode.getId(), ""); + } } @Override @@ -359,7 +366,7 @@ public class StartDatafeedAction PersistentTaskInProgress datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); if (datafeedTask != null && datafeedState == DatafeedState.STARTED) { - if (datafeedTask.getExecutorNode() == null) { + if (datafeedTask.isAssigned() == false) { // We can skip the datafeed state check below, because the task got unassigned after we went into // started state on a node that disappeared and we didn't have the opportunity to set the status to stopped return; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index 75e4ce38737..2ee2b32dcad 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -65,7 +65,7 @@ public abstract class TransportJobTaskAction jobTask = MlMetadata.getJobTask(jobId, tasks); - if (jobTask == null || jobTask.getExecutorNode() == null) { + if (jobTask == null || jobTask.isAssigned() == false) { listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED + "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); } else { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java index 465695b9c4d..b39aa3d31ef 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java @@ -20,11 +20,10 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; /** * Component that runs only on the master node and is responsible for assigning running tasks to nodes @@ -55,13 +54,13 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - final String executorNodeId; + final Assignment assignment; if (stopped) { - executorNodeId = null; // the task is stopped no need to assign it anywhere yet + assignment = PersistentTasksInProgress.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere } else { - executorNodeId = executorNode(action, currentState, request); + assignment = getAssignement(action, currentState, request); } - return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, executorNodeId)); + return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, assignment)); } @Override @@ -100,7 +99,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C if (tasksInProgress.hasTask(id)) { if (failure != null) { // If the task failed - we need to restart it on another node, otherwise we just remove it - tasksInProgress.reassignTask(id, (action, request) -> executorNode(action, currentState, request)); + tasksInProgress.reassignTask(id, (action, request) -> getAssignement(action, currentState, request)); } else { tasksInProgress.finishTask(id); } @@ -137,7 +136,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { return update(currentState, tasksInProgress - .assignTask(id, (action, request) -> executorNode(action, currentState, request))); + .assignTask(id, (action, request) -> getAssignement(action, currentState, request))); } else { throw new ResourceNotFoundException("the task with id {} doesn't exist", id); } @@ -216,26 +215,17 @@ public class PersistentTaskClusterService extends AbstractComponent implements C }); } - private String executorNode(String action, ClusterState currentState, Request request) { + private Assignment getAssignement(String action, ClusterState currentState, Request request) { TransportPersistentAction persistentAction = registry.getPersistentActionSafe(action); persistentAction.validate(request, currentState); - DiscoveryNode executorNode = persistentAction.executorNode(request, currentState); - final String executorNodeId; - if (executorNode == null) { - // The executor node not available yet, we will create task with empty executor node and try - // again later - executorNodeId = null; - } else { - executorNodeId = executorNode.getId(); - } - return executorNodeId; + return persistentAction.getAssignment(request, currentState); } @Override public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); - if (reassignmentRequired(event, this::executorNode)) { + if (reassignmentRequired(event, this::getAssignement)) { logger.trace("task reassignment is needed"); reassignTasks(); } else { @@ -245,7 +235,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C } interface ExecutorNodeDecider { - String executorNode(String action, ClusterState currentState, Request request); + Assignment getAssignment(String action, ClusterState currentState, Request request); } static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { @@ -257,18 +247,16 @@ public class PersistentTaskClusterService extends AbstractComponent implements C event.previousState().nodes().isLocalNodeElectedMaster() == false)) { // We need to check if removed nodes were running any of the tasks and reassign them boolean reassignmentRequired = false; - Set removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); for (PersistentTaskInProgress taskInProgress : tasks.tasks()) { - if (taskInProgress.isStopped() == false) { // skipping stopped tasks - if (taskInProgress.getExecutorNode() == null || removedNodes.contains(taskInProgress.getExecutorNode())) { - // there is an unassigned task or task with a disappeared node - we need to try assigning it - if (Objects.equals(taskInProgress.getRequest(), - decider.executorNode(taskInProgress.getAction(), event.state(), taskInProgress.getRequest())) == false) { - // it looks like a assignment for at least one task is possible - let's trigger reassignment - reassignmentRequired = true; - break; - } + if (taskInProgress.needsReassignment(event.state().nodes())) { + // there is an unassigned task or task with a disappeared node - we need to try assigning it + if (Objects.equals(taskInProgress.getAssignment(), + decider.getAssignment(taskInProgress.getAction(), event.state(), taskInProgress.getRequest())) == false) { + // it looks like a assignment for at least one task is possible - let's trigger reassignment + reassignmentRequired = true; + break; } + } } return reassignmentRequired; @@ -283,7 +271,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return reassignTasks(currentState, logger, PersistentTaskClusterService.this::executorNode); + return reassignTasks(currentState, logger, PersistentTaskClusterService.this::getAssignement); } @Override @@ -306,16 +294,15 @@ public class PersistentTaskClusterService extends AbstractComponent implements C logger.trace("reassigning {} persistent tasks", tasks.tasks().size()); // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTaskInProgress task : tasks.tasks()) { - if (task.isStopped() == false && - (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false)) { + if (task.needsReassignment(nodes)) { // there is an unassigned task - we need to try assigning it - String executorNode = decider.executorNode(task.getAction(), clusterState, task.getRequest()); - if (Objects.equals(executorNode, task.getExecutorNode()) == false) { + Assignment assignment = decider.getAssignment(task.getAction(), clusterState, task.getRequest()); + if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace("reassigning task {} from node {} to node {}", task.getId(), - task.getExecutorNode(), executorNode); - clusterState = update(clusterState, builder(clusterState).reassignTask(task.getId(), executorNode)); + task.getAssignment().getExecutorNode(), assignment.getExecutorNode()); + clusterState = update(clusterState, builder(clusterState).reassignTask(task.getId(), assignment)); } else { - logger.trace("ignoring task {} because executor nodes are the same {}", task.getId(), executorNode); + logger.trace("ignoring task {} because assignment is the same {}", task.getId(), assignment); } } else { if (task.isStopped()) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java index a94486e57f0..37c3ff1bafc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java @@ -9,12 +9,14 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ObjectParser.NamedObjectParser; import org.elasticsearch.common.xcontent.ToXContent; @@ -36,6 +38,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.MetaData.ALL_CONTEXTS; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; /** * A cluster state record that contains a list of all running persistent tasks @@ -61,6 +64,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER = new ObjectParser<>("running_tasks", TaskBuilder::new); + public static final ConstructingObjectParser ASSIGNMENT_PARSER = + new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1])); + public static final NamedObjectParser, Void> ACTION_PARSER; static { @@ -77,6 +83,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable p.namedObject(Status.class, c, null), new ParseField("status")); ACTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new ActionDescriptionBuilder<>(name), name); + // Assignment parser + ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node")); + ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation")); + // Task parser initialization PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setId, new ParseField("id")); PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); @@ -92,7 +102,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable action.equals(task.action) && nodeId.equals(task.executorNode)).count(); + return tasks.values().stream().filter(task -> action.equals(task.action) && nodeId.equals(task.assignment.executorNode)).count(); } @Override @@ -182,6 +192,54 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable task, boolean stopped, String newExecutorNode) { + public PersistentTaskInProgress(PersistentTaskInProgress task, boolean stopped, Assignment assignment) { this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status, - newExecutorNode, task.allocationId); + assignment, task.allocationId); } public PersistentTaskInProgress(PersistentTaskInProgress task, Status status) { this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, - task.executorNode, task.allocationId); + task.assignment, task.allocationId); } private PersistentTaskInProgress(long id, long allocationId, String action, Request request, boolean stopped, boolean removeOnCompletion, Status status, - String executorNode, Long allocationIdOnLastStatusUpdate) { + Assignment assignment, Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; this.action = action; @@ -225,7 +282,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable setId(long id) { @@ -412,8 +490,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable setExecutorNode(String executorNode) { - this.executorNode = executorNode; + public TaskBuilder setAssignment(Assignment assignment) { + this.assignment = assignment; return this; } @@ -424,7 +502,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable build() { return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, - executorNode, allocationIdOnLastStatusUpdate); + assignment, allocationIdOnLastStatusUpdate); } } @@ -510,22 +588,21 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable Builder addTask(String action, Request request, boolean stopped, - boolean removeOnCompletion, String executorNode) { + boolean removeOnCompletion, Assignment assignment) { changed = true; currentId++; - tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion, - executorNode)); + tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion, assignment)); return this; } /** * Reassigns the task to another node if the task exist */ - public Builder reassignTask(long taskId, String executorNode) { + public Builder reassignTask(long taskId, Assignment assignment) { PersistentTaskInProgress taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment)); } return this; } @@ -538,13 +615,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable Builder assignTask(long taskId, - BiFunction executorNodeFunc) { + BiFunction executorNodeFunc) { PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); - if (taskInProgress != null && taskInProgress.getExecutorNode() == null) { // only assign unassigned tasks - String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); - if (executorNode != null || taskInProgress.isStopped()) { + if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks + Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); + if (assignment.isAssigned() || taskInProgress.isStopped()) { changed = true; - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment)); } } return this; @@ -555,12 +632,12 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable Builder reassignTask(long taskId, - BiFunction executorNodeFunc) { + BiFunction executorNodeFunc) { PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); if (taskInProgress != null) { changed = true; - String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment)); } return this; } @@ -599,7 +676,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable(taskInProgress, true, null)); + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT)); } } return this; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java index af74346514d..f04d22c5acf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import java.util.function.Predicate; import java.util.function.Supplier; @@ -42,15 +43,23 @@ public abstract class TransportPersistentAction * The default implementation returns the least loaded data node */ - public DiscoveryNode executorNode(Request request, ClusterState clusterState) { - return selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode); + public Assignment getAssignment(Request request, ClusterState clusterState) { + DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode); + if (discoveryNode == null) { + return NO_NODE_FOUND; + } else { + return new Assignment(discoveryNode.getId(), ""); + } } + /** * Finds the least loaded node that satisfies the selector criteria */ diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 667f7ea5ebc..5220a1121a0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -35,6 +35,7 @@ import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; +import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -269,7 +270,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); PersistentTaskInProgress taskInProgress = - new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, null); + new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); PersistentTasksInProgress tasksInProgress = new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); @@ -331,7 +332,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); PersistentTaskInProgress taskInProgress = - new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, null); + new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); PersistentTasksInProgress tasksInProgress = new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index abb3856c237..17accdc5fe2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import java.net.InetAddress; @@ -122,12 +123,12 @@ public class OpenJobActionTests extends ESTestCase { .build(); Map> taskMap = new HashMap<>(); - taskMap.put(0L, - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, "_node_id1")); - taskMap.put(1L, - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true, "_node_id1")); - taskMap.put(2L, - new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true, "_node_id2")); + taskMap.put(0L, new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, + new Assignment("_node_id1", "test assignment"))); + taskMap.put(1L, new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true, + new Assignment("_node_id1", "test assignment"))); + taskMap.put(2L, new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true, + new Assignment("_node_id2", "test assignment"))); PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); @@ -157,7 +158,7 @@ public class OpenJobActionTests extends ESTestCase { for (int j = 0; j < maxRunningJobsPerNode; j++) { long id = j + (maxRunningJobsPerNode * i); taskMap.put(id, new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request("job_id" + id), - false, true, nodeId)); + false, true, new Assignment(nodeId, "test assignment"))); } } PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap); @@ -183,7 +184,8 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, "_node_id1"); + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, + new Assignment("_node_id1", "test assignment")); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); @@ -241,7 +243,7 @@ public class OpenJobActionTests extends ESTestCase { result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); assertNull("no node selected, because OPENING state", result); - taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, "_node_id3")); + taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, new Assignment("_node_id3", "test assignment"))); tasks = new PersistentTasksInProgress(6L, taskMap); csBuilder = ClusterState.builder(cs); @@ -295,7 +297,8 @@ public class OpenJobActionTests extends ESTestCase { public static PersistentTaskInProgress createJobTask(long id, String jobId, String nodeId, JobState jobState) { PersistentTaskInProgress task = - new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, nodeId); + new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, + new Assignment(nodeId, "test assignment")); task = new PersistentTaskInProgress<>(task, jobState); return task; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 90cd2d1a97c..855f6b3f49b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import java.net.InetAddress; @@ -32,6 +33,7 @@ import java.util.Map; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; +import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT; import static org.hamcrest.Matchers.equalTo; public class StartDatafeedActionTests extends ESTestCase { @@ -86,7 +88,7 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), false, true, null); + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), false, true, INITIAL_ASSIGNMENT); PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)); DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) @@ -112,7 +114,7 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - false, true, "node_id"); + false, true, new Assignment("node_id", "test assignment")); datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); Map> taskMap = new HashMap<>(); taskMap.put(0L, jobTask); @@ -139,7 +141,7 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - false, true, "node_id1"); + false, true, new Assignment("node_id1", "test assignment")); datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); Map> taskMap = new HashMap<>(); taskMap.put(0L, jobTask); @@ -148,7 +150,7 @@ public class StartDatafeedActionTests extends ESTestCase { StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes); datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - false, true, null); + false, true, INITIAL_ASSIGNMENT); datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); taskMap.put(1L, datafeedTask); StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java index 062f2306268..3095bd431cc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Response; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; import java.io.IOException; @@ -75,10 +76,12 @@ public class PersistentActionCoordinatorTests extends ESTestCase { boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.addTask("test_action", new TestRequest("other_" + i), false, true, "other_node_" + randomInt(nonLocalNodesCount)); + tasks.addTask("test_action", new TestRequest("other_" + i), false, true, + new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node")); if (added == false && randomBoolean()) { added = true; - tasks.addTask("test", new TestRequest("this_param"), false, true, "this_node"); + tasks.addTask("test", new TestRequest("this_param"), false, true, + new Assignment("this_node", "test assignment on this node")); } } } @@ -288,7 +291,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase { PersistentTasksInProgress.Builder builder = PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, - builder.addTask(action, request, false, true, node).build())).build(); + builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build(); } private ClusterState reallocateTask(ClusterState state, long taskId, String node) { @@ -296,7 +299,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase { PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); assertTrue(builder.hasTask(taskId)); return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, - builder.reassignTask(taskId, node).build())).build(); + builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build(); } private ClusterState removeTask(ClusterState state, long taskId) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java index 0efd42ddfc4..d27a68fc14b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; @@ -28,6 +29,7 @@ import java.util.HashSet; import java.util.List; import static java.util.Collections.emptyMap; +import static org.elasticsearch.xpack.persistent.TransportPersistentAction.NO_NODE_FOUND; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -36,7 +38,7 @@ import static org.hamcrest.Matchers.nullValue; public class PersistentTaskClusterServiceTests extends ESTestCase { public void testReassignmentRequired() { - int numberOfIterations = randomIntBetween(10, 100); + int numberOfIterations = randomIntBetween(1, 30); ClusterState clusterState = initialState(); for (int i = 0; i < numberOfIterations; i++) { boolean significant = randomBoolean(); @@ -48,14 +50,17 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { clusterState = insignificantChange(clusterState); } ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState); - assertThat(dumpEvent(event), significant, equalTo(PersistentTaskClusterService.reassignmentRequired(event, + assertThat(dumpEvent(event), PersistentTaskClusterService.reassignmentRequired(event, new PersistentTaskClusterService.ExecutorNodeDecider() { @Override - public String executorNode( + public Assignment getAssignment( String action, ClusterState currentState, Request request) { - return randomNode(currentState.nodes()); + if ("never_assign".equals(((TestRequest) request).getTestParam())) { + return NO_NODE_FOUND; + } + return randomNodeAssignment(currentState.nodes()); } - }))); + }), equalTo(significant)); } } @@ -128,23 +133,32 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { for (PersistentTaskInProgress task : tasksInProgress.tasks()) { if (task.isStopped()) { assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue()); + assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getAction())); } else { + // explanation should correspond to the action name switch (task.getAction()) { case "should_assign": assertThat(task.getExecutorNode(), notNullValue()); + assertThat(task.isAssigned(), equalTo(true)); if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString()); } assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); break; case "should_not_assign": assertThat(task.getExecutorNode(), nullValue()); + assertThat(task.isAssigned(), equalTo(false)); + assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment")); break; case "assign_one": - if (task.getExecutorNode() != null) { + if (task.isAssigned()) { assignOneCount++; assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); + } else { + assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time")); } break; default: @@ -165,14 +179,14 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { return PersistentTaskClusterService.reassignTasks(clusterState, logger, new PersistentTaskClusterService.ExecutorNodeDecider() { @Override - public String executorNode( + public Assignment getAssignment( String action, ClusterState currentState, Request request) { TestRequest testRequest = (TestRequest) request; switch (testRequest.getTestParam()) { case "assign_me": - return randomNode(currentState.nodes()); + return randomNodeAssignment(currentState.nodes()); case "dont_assign_me": - return null; + return NO_NODE_FOUND; case "fail_me_if_called": fail("the decision decider shouldn't be called on this task"); return null; @@ -181,33 +195,37 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { default: fail("unknown param " + testRequest.getTestParam()); } - return null; + return NO_NODE_FOUND; } }); } - private String assignOnlyOneTaskAtATime(ClusterState clusterState) { + private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { DiscoveryNodes nodes = clusterState.nodes(); PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); if (tasksInProgress.findTasks("assign_one", task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) { - return randomNode(clusterState.nodes()); + return randomNodeAssignment(clusterState.nodes()); } else { - return null; + return new Assignment(null, "only one task can be assigned at a time"); } } - private String randomNode(DiscoveryNodes nodes) { + private Assignment randomNodeAssignment(DiscoveryNodes nodes) { if (nodes.getNodes().isEmpty()) { - return null; + return NO_NODE_FOUND; } List nodeList = new ArrayList<>(); for (ObjectCursor node : nodes.getNodes().keys()) { nodeList.add(node.value); } - return randomFrom(nodeList); - + String node = randomFrom(nodeList); + if (node != null) { + return new Assignment(node, "test assignment"); + } else { + return NO_NODE_FOUND; + } } private String dumpEvent(ClusterChangedEvent event) { @@ -222,10 +240,8 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); if (tasks != null) { if (randomBoolean()) { - // - boolean removedNode = false; for (PersistentTaskInProgress task : tasks.tasks()) { - if (task.getExecutorNode() != null && clusterState.nodes().nodeExists(task.getExecutorNode())) { + if (task.isAssigned() && clusterState.nodes().nodeExists(task.getExecutorNode())) { logger.info("removed node {}", task.getExecutorNode()); builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode())); return builder.build(); @@ -235,11 +251,18 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { } boolean tasksOrNodesChanged = false; // add a new unassigned task - if (hasUnassigned(tasks, clusterState.nodes()) == false) { + if (hasAssignableTasks(tasks, clusterState.nodes()) == false) { // we don't have any unassigned tasks - add some - logger.info("added random task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false); - tasksOrNodesChanged = true; + if (randomBoolean()) { + logger.info("added random task"); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false); + tasksOrNodesChanged = true; + } else { + logger.info("added unassignable task with custom assignment message"); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), + new Assignment(null, "change me"), "never_assign", false); + tasksOrNodesChanged = true; + } } // add a node if there are unassigned tasks if (clusterState.nodes().getNodes().isEmpty()) { @@ -259,29 +282,58 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { return builder.build(); } + private PersistentTasksInProgress removeTasksWithChangingAssignment(PersistentTasksInProgress tasks) { + if (tasks != null) { + boolean changed = false; + PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks); + for (PersistentTaskInProgress task : tasks.tasks()) { + // Remove all unassigned tasks that cause changing assignments they might trigger a significant change + if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) && + "change me".equals(task.getAssignment().getExplanation())) { + logger.info("removed task with changing assignment {}", task.getId()); + tasksBuilder.removeTask(task.getId()); + changed = true; + } + } + if (changed) { + return tasksBuilder.build(); + } + } + return tasks; + } + private ClusterState insignificantChange(ClusterState clusterState) { ClusterState.Builder builder = ClusterState.builder(clusterState); PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + tasks = removeTasksWithChangingAssignment(tasks); + PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks); + if (randomBoolean()) { - if (hasUnassigned(tasks, clusterState.nodes()) == false) { + if (hasAssignableTasks(tasks, clusterState.nodes()) == false) { // we don't have any unassigned tasks - adding a node or changing a routing table shouldn't affect anything if (randomBoolean()) { logger.info("added random node"); builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10)))); } + if (randomBoolean()) { + logger.info("added random unassignable task"); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign", false); + return builder.build(); + } if (randomBoolean()) { // add unassigned task in stopped state logger.info("added random stopped task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, true); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, null, true); return builder.build(); } else { logger.info("changed routing table"); MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); + metaData.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build()); RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); changeRoutingTable(metaData, routingTable); builder.metaData(metaData).routingTable(routingTable.build()); + return builder.build(); } - return builder.build(); } } if (randomBoolean()) { @@ -308,10 +360,9 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { } } logger.info("removed all unassigned tasks and changed routing table"); - PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks); if (tasks != null) { for (PersistentTaskInProgress task : tasks.tasks()) { - if (task.getExecutorNode() == null) { + if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) { tasksBuilder.removeTask(task.getId()); } } @@ -327,13 +378,19 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { return builder.metaData(metaData).build(); } - private boolean hasUnassigned(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) { + private boolean hasAssignableTasks(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) { if (tasks == null || tasks.tasks().isEmpty()) { return false; } - return tasks.tasks().stream().anyMatch(task -> - task.isStopped() == false && - (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode()))); + return tasks.tasks().stream().anyMatch(task -> { + if (task.isStopped()) { + return false; + } + if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) { + return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false; + } + return false; + }); } private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) { @@ -343,14 +400,20 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks, - String node, - boolean stopped) { + String node, boolean stopped) { + return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)), + randomAsciiOfLength(10), stopped); + } + + private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, + MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks, + Assignment assignment, String param, boolean stopped) { return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE, - tasks.addTask(randomAsciiOfLength(10), new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), node).build())); + tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build())); } private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) { - tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), node); + tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action)); } private DiscoveryNode newNode(String nodeId) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java index 9912b38f10f..ed63894bf9a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Builder; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.Status; @@ -34,6 +35,7 @@ import java.util.Collections; import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY; import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT; +import static org.elasticsearch.xpack.persistent.TransportPersistentAction.NO_NODE_FOUND; public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase { @@ -42,8 +44,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio int numberOfTasks = randomInt(10); PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(); for (int i = 0; i < numberOfTasks; i++) { + boolean stopped = randomBoolean(); tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), - randomBoolean(), randomBoolean(), randomAsciiOfLength(10)); + stopped, randomBoolean(), stopped ? new Assignment(null, "stopped") : randomAssignment()); if (randomBoolean()) { // From time to time update status tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10))); @@ -79,7 +82,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio if (tasksInProgress.tasks().isEmpty()) { addRandomTask(builder); } else { - builder.reassignTask(pickRandomTask(tasksInProgress), randomAsciiOfLength(10)); + builder.reassignTask(pickRandomTask(tasksInProgress), randomAssignment()); } break; case 2: @@ -133,8 +136,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio } private Builder addRandomTask(Builder builder) { - builder.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), - randomBoolean(), randomBoolean(), randomAsciiOfLength(10)); + boolean stopped = randomBoolean(); + builder.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), + stopped ? new Assignment(null, "stopped") : randomAssignment()); return builder; } @@ -210,9 +214,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio changed = true; } if (randomBoolean()) { - builder.reassignTask(lastKnownTask, randomAsciiOfLength(10)); + builder.reassignTask(lastKnownTask, randomAssignment()); } else { - builder.reassignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + builder.reassignTask(lastKnownTask, (s, request) -> randomAssignment()); } break; case 2: @@ -220,22 +224,23 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio PersistentTaskInProgress task = builder.build().getTask(lastKnownTask); if (randomBoolean()) { // Trying to reassign to the same node - builder.assignTask(lastKnownTask, (s, request) -> task.getExecutorNode()); + builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment()); // should change if the task was stopped AND unassigned if (task.getExecutorNode() == null && task.isStopped()) { changed = true; } } else { // Trying to reassign to a different node - builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); - // should change if the task was unassigned - if (task.getExecutorNode() == null) { + Assignment randomAssignment = randomAssignment(); + builder.assignTask(lastKnownTask, (s, request) -> randomAssignment); + // should change if the task was unassigned and was reassigned to a different node or started + if ((task.isAssigned() == false && randomAssignment.isAssigned()) || task.isStopped()) { changed = true; } } } else { // task doesn't exist - shouldn't change - builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + builder.assignTask(lastKnownTask, (s, request) -> randomAssignment()); } break; case 3: @@ -264,4 +269,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio } + private Assignment randomAssignment() { + if (randomBoolean()) { + if (randomBoolean()) { + return NO_NODE_FOUND; + } else { + return new Assignment(null, randomAsciiOfLength(10)); + } + } + return new Assignment(randomAsciiOfLength(10), randomAsciiOfLength(10)); + } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java index f2c86f5c0e0..3cde981a7c0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java @@ -49,6 +49,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; import java.io.IOException; import java.util.ArrayList; @@ -351,12 +352,18 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { } @Override - public DiscoveryNode executorNode(TestRequest request, ClusterState clusterState) { + public Assignment getAssignment(TestRequest request, ClusterState clusterState) { if (request.getExecutorNodeAttr() == null) { - return super.executorNode(request, clusterState); + return super.getAssignment(request, clusterState); } else { - return selectLeastLoadedNode(clusterState, + DiscoveryNode executorNode = selectLeastLoadedNode(clusterState, discoveryNode -> request.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr"))); + if (executorNode != null) { + return new Assignment(executorNode.getId(), "test assignment"); + } else { + return NO_NODE_FOUND; + } + } }