diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java index 3f1a059cba0..223064e6128 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java @@ -60,10 +60,6 @@ public class CreatePersistentTaskAction extends Action { @@ -226,7 +184,7 @@ public class CreatePersistentTaskAction extends Action listener) { - persistentTasksClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion, + persistentTasksClusterService.createPersistentTask(request.action, request.request, new ActionListener() { @Override public void onResponse(Long newTaskId) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java index 68091359b70..21254571322 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java @@ -47,21 +47,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param request request * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String action, Request request, boolean stopped, - boolean removeOnCompletion, + public void createPersistentTask(String action, Request request, ActionListener listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { validate(action, clusterService.state(), request); final Assignment assignment; - if (stopped) { - // the task is stopped no need to assign it anywhere - assignment = PersistentTasksCustomMetaData.FINISHED_TASK_ASSIGNMENT; - } else { - assignment = getAssignement(action, currentState, request); - } - return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, assignment)); + assignment = getAssignement(action, currentState, request); + return update(currentState, builder(currentState).addTask(action, request, assignment)); } @Override @@ -305,11 +299,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements logger.trace("ignoring task {} because assignment is the same {}", task.getId(), assignment); } } else { - if (task.isStopped()) { - logger.trace("ignoring task {} because it is stopped", task.getId()); - } else { - logger.trace("ignoring task {} because it is still running", task.getId()); - } + logger.trace("ignoring task {} because it is still running", task.getId()); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index 0b7727fca01..1da7a6f19b7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -84,8 +84,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskBuilder, List objects) -> { if (objects.size() != 1) { @@ -227,8 +225,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable task, boolean stopped, Assignment assignment) { - this(task.id, task.allocationId + 1L, task.taskName, task.request, stopped, task.removeOnCompletion, task.status, + public PersistentTask(PersistentTask task, Assignment assignment) { + this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status, assignment, task.allocationId); } public PersistentTask(PersistentTask task, Status status) { - this(task.id, task.allocationId, task.taskName, task.request, task.stopped, task.removeOnCompletion, status, + this(task.id, task.allocationId, task.taskName, task.request, status, task.assignment, task.allocationId); } - private PersistentTask(long id, long allocationId, String taskName, Request request, boolean stopped, boolean removeOnCompletion, + private PersistentTask(long id, long allocationId, String taskName, Request request, Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; this.taskName = taskName; this.request = request; this.status = status; - this.stopped = stopped; - this.removeOnCompletion = removeOnCompletion; this.assignment = assignment; this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; // Update parent request for starting tasks with correct parent task ID @@ -271,8 +265,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable setStopped(boolean stopped) { - this.stopped = stopped; - return this; - } - - public TaskBuilder setRemoveOnCompletion(boolean removeOnCompletion) { - this.removeOnCompletion = removeOnCompletion; - return this; - } - public TaskBuilder setAssignment(Assignment assignment) { this.assignment = assignment; return this; @@ -479,7 +445,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable build() { - return new PersistentTask<>(id, allocationId, taskName, request, stopped, removeOnCompletion, status, + return new PersistentTask<>(id, allocationId, taskName, request, status, assignment, allocationIdOnLastStatusUpdate); } } @@ -565,11 +531,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable * After the task is added its id can be found by calling {{@link #getCurrentId()}} method. */ - public Builder addTask(String taskName, Request request, boolean stopped, - boolean removeOnCompletion, Assignment assignment) { + public Builder addTask(String taskName, Request request, Assignment assignment) { changed = true; currentId++; - tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, stopped, removeOnCompletion, assignment)); + tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment)); return this; } @@ -580,7 +545,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment)); } return this; } @@ -596,9 +561,9 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskInProgress = (PersistentTask) tasks.get(taskId); if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request); - if (assignment.isAssigned() || taskInProgress.isStopped()) { + if (assignment.isAssigned()) { changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment)); } } return this; @@ -636,11 +601,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - if (taskInProgress.removeOnCompletion) { - tasks.remove(taskId); - } else { - tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT)); - } + tasks.remove(taskId); } return this; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index e636393d751..23d76a7cad6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -29,27 +29,14 @@ public class PersistentTasksService extends AbstractComponent { this.clusterService = clusterService; } - /** - * Creates the specified persistent action and tries to start it immediately, upon completion the task is - * removed from the cluster state - */ - public void createPersistentActionTask(String action, Request request, - PersistentTaskOperationListener listener) { - createPersistentActionTask(action, request, false, true, listener); - } - /** * Creates the specified persistent action. The action is started unless the stopped parameter is equal to true. * If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion. * Otherwise it will remain there in the stopped state. */ public void createPersistentActionTask(String action, Request request, - boolean stopped, - boolean removeOnCompletion, PersistentTaskOperationListener listener) { CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request); - createPersistentActionRequest.setStopped(stopped); - createPersistentActionRequest.setRemoveOnCompletion(removeOnCompletion); try { client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( o -> listener.onResponse(o.getTaskId()), listener::onFailure)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 8d5f2720cd6..a150ce30d97 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -270,7 +270,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); PersistentTask taskInProgress = - new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); + new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); @@ -332,7 +332,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); PersistentTask taskInProgress = - new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); + new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java index 44014744c62..1fd9a2030b3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java @@ -104,7 +104,7 @@ public class CloseJobActionTests extends ESTestCase { public static PersistentTask createDatafeedTask(long id, String datafeedId, long startTime, String nodeId, DatafeedState datafeedState) { PersistentTask task = - new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), false, true, + new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); task = new PersistentTask<>(task, datafeedState); return task; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 7a72018d528..b760ca693ae 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; @@ -118,12 +119,9 @@ public class OpenJobActionTests extends ESTestCase { .build(); Map> taskMap = new HashMap<>(); - taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, - new Assignment("_node_id1", "test assignment"))); - taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true, - new Assignment("_node_id1", "test assignment"))); - taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true, - new Assignment("_node_id2", "test assignment"))); + taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), new Assignment("_node_id1", "test assignment"))); + taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), new Assignment("_node_id1", "test assignment"))); + taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), new Assignment("_node_id2", "test assignment"))); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); @@ -180,7 +178,7 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTask task = - new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, + new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), new Assignment("_node_id1", "test assignment")); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); @@ -241,7 +239,7 @@ public class OpenJobActionTests extends ESTestCase { assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put(5L, new PersistentTask<>(lastTask, false, new Assignment("_node_id3", "test assignment"))); + taskMap.put(5L, new PersistentTask<>(lastTask, new Assignment("_node_id3", "test assignment"))); tasks = new PersistentTasksCustomMetaData(6L, taskMap); csBuilder = ClusterState.builder(cs); @@ -251,7 +249,7 @@ public class OpenJobActionTests extends ESTestCase { assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put(5L, new PersistentTask<>(lastTask, null)); + taskMap.put(5L, new PersistentTask<>(lastTask, (Task.Status) null)); tasks = new PersistentTasksCustomMetaData(6L, taskMap); csBuilder = ClusterState.builder(cs); @@ -300,8 +298,7 @@ public class OpenJobActionTests extends ESTestCase { public static PersistentTask createJobTask(long id, String jobId, String nodeId, JobState jobState) { PersistentTask task = - new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, - new Assignment(nodeId, "test assignment")); + new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment")); task = new PersistentTask<>(task, jobState); return task; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index a6a5eceb0c9..c3f4f4020f9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -126,8 +126,7 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .build(); PersistentTask task = - new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, - INITIAL_ASSIGNMENT); + new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task)); DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) @@ -153,7 +152,7 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTask jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); PersistentTask datafeedTask = new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - false, true, new Assignment("node_id", "test assignment")); + new Assignment("node_id", "test assignment")); datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); Map> taskMap = new HashMap<>(); taskMap.put(0L, jobTask); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index 7b114e8cf38..2a1e3a79454 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -48,7 +48,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe public void testValidate() { PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksCustomMetaData.Assignment("node_id", "")); + new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STARTED); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); @@ -69,7 +69,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe PersistentTasksCustomMetaData tasks; if (randomBoolean()) { PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksCustomMetaData.Assignment("node_id", "")); + new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STOPPED); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); } else { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java index 456bdbbfe5a..b3b1f3ceaf4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java @@ -78,7 +78,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(2, 40); for (int i = 0; i < numberOfTasks; i++) { - addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits"); } MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); @@ -99,21 +99,17 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(0, 40); for (int i = 0; i < numberOfTasks; i++) { - switch (randomInt(3)) { + switch (randomInt(2)) { case 0: // add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned - addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits"); break; case 1: // add a task assigned to non-existing node that should not get assigned - addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits"); break; case 2: - // add a stopped task assigned to non-existing node that should not get assigned - addTask(tasks, "should_not_assign", "fail_me_if_called", null, true); - break; - case 3: - addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits"); break; } @@ -131,39 +127,34 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { int assignOneCount = 0; for (PersistentTask task : tasksInProgress.tasks()) { - if (task.isStopped()) { - assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue()); - assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getTaskName())); - } else { - // explanation should correspond to the action name - switch (task.getTaskName()) { - case "should_assign": - assertThat(task.getExecutorNode(), notNullValue()); - assertThat(task.isAssigned(), equalTo(true)); - if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { - logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString()); - } - assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), - clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); + // explanation should correspond to the action name + switch (task.getTaskName()) { + case "should_assign": + assertThat(task.getExecutorNode(), notNullValue()); + assertThat(task.isAssigned(), equalTo(true)); + if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { + logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString()); + } + assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), + clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); + break; + case "should_not_assign": + assertThat(task.getExecutorNode(), nullValue()); + assertThat(task.isAssigned(), equalTo(false)); + assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment")); + break; + case "assign_one": + if (task.isAssigned()) { + assignOneCount++; + assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1)); assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); - break; - case "should_not_assign": - assertThat(task.getExecutorNode(), nullValue()); - assertThat(task.isAssigned(), equalTo(false)); - assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment")); - break; - case "assign_one": - if (task.isAssigned()) { - assignOneCount++; - assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1)); - assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); - } else { - assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time")); - } - break; - default: - fail("Unknown action " + task.getTaskName()); - } + } else { + assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time")); + } + break; + default: + fail("Unknown action " + task.getTaskName()); } } } @@ -204,8 +195,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { DiscoveryNodes nodes = clusterState.nodes(); PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasksInProgress.findTasks("assign_one", - task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) { + if (tasksInProgress.findTasks("assign_one", task -> nodes.nodeExists(task.getExecutorNode())).isEmpty()) { return randomNodeAssignment(clusterState.nodes()); } else { return new Assignment(null, "only one task can be assigned at a time"); @@ -255,13 +245,12 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { // we don't have any unassigned tasks - add some if (randomBoolean()) { logger.info("added random task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null, - false); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null); tasksOrNodesChanged = true; } else { logger.info("added unassignable task with custom assignment message"); addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), - new Assignment(null, "change me"), "never_assign", false); + new Assignment(null, "change me"), "never_assign"); tasksOrNodesChanged = true; } } @@ -318,23 +307,16 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { } if (randomBoolean()) { logger.info("added random unassignable task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign", false); - return builder.build(); - } - if (randomBoolean()) { - // add unassigned task in stopped state - logger.info("added random stopped task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, null, true); - return builder.build(); - } else { - logger.info("changed routing table"); - MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); - metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()); - RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); - changeRoutingTable(metaData, routingTable); - builder.metaData(metaData).routingTable(routingTable.build()); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign"); return builder.build(); } + logger.info("changed routing table"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()); + RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); + changeRoutingTable(metaData, routingTable); + builder.metaData(metaData).routingTable(routingTable.build()); + return builder.build(); } } if (randomBoolean()) { @@ -384,9 +366,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { return false; } return tasks.tasks().stream().anyMatch(task -> { - if (task.isStopped()) { - return false; - } if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) { return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false; } @@ -401,20 +380,20 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, - String node, boolean stopped) { + String node) { return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)), - randomAsciiOfLength(10), stopped); + randomAsciiOfLength(10)); } private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, - Assignment assignment, String param, boolean stopped) { + Assignment assignment, String param) { return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE, - tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build())); + tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), assignment).build())); } - private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node, boolean stopped) { - tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action)); + private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) { + tasks.addTask(action, new TestRequest(param), new Assignment(node, "explanation: " + action)); } private DiscoveryNode newNode(String nodeId) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java index 9408fca0e75..080dc459387 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java @@ -44,9 +44,8 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ int numberOfTasks = randomInt(10); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); for (int i = 0; i < numberOfTasks; i++) { - boolean stopped = randomBoolean(); tasks.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), - stopped, randomBoolean(), stopped ? new Assignment(null, "stopped") : randomAssignment()); + randomAssignment()); if (randomBoolean()) { // From time to time update status tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10))); @@ -136,9 +135,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ } private Builder addRandomTask(Builder builder) { - boolean stopped = randomBoolean(); - builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), - stopped ? new Assignment(null, "stopped") : randomAssignment()); + builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), randomAssignment()); return builder; } @@ -183,7 +180,6 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ assertEquals(testTask.getId(), newTask.getId()); assertEquals(testTask.getStatus(), newTask.getStatus()); assertEquals(testTask.getRequest(), newTask.getRequest()); - assertEquals(testTask.isStopped(), newTask.isStopped()); // Things that shouldn't be serialized assertEquals(0, newTask.getAllocationId()); @@ -220,16 +216,12 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ if (randomBoolean()) { // Trying to reassign to the same node builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment()); - // should change if the task was stopped AND unassigned - if (task.getExecutorNode() == null && task.isStopped()) { - changed = true; - } } else { // Trying to reassign to a different node Assignment randomAssignment = randomAssignment(); builder.assignTask(lastKnownTask, (s, request) -> randomAssignment); // should change if the task was unassigned and was reassigned to a different node or started - if ((task.isAssigned() == false && randomAssignment.isAssigned()) || task.isStopped()) { + if ((task.isAssigned() == false && randomAssignment.isAssigned())) { changed = true; } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java index e105678cc71..760e478f996 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java @@ -47,35 +47,26 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { long[] taskIds = new long[numberOfTasks]; List futures = new ArrayList<>(numberOfTasks); - boolean[] stopped = new boolean[numberOfTasks]; - int runningTasks = 0; for (int i = 0; i < numberOfTasks; i++) { - stopped[i] = randomBoolean(); - if (stopped[i] == false) { - runningTasks++; - } PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); futures.add(future); - service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped[i], true, future); + service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); } for (int i = 0; i < numberOfTasks; i++) { taskIds[i] = futures.get(i).get(); } - final int numberOfRunningTasks = runningTasks; PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() .custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); - if (numberOfRunningTasks > 0) { - // Make sure that at least one of the tasks is running - assertBusy(() -> { - // Wait for the task to start - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), greaterThan(0)); - }); - } + // Make sure that at least one of the tasks is running + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() + .getTasks().size(), greaterThan(0)); + }); // Restart cluster internalCluster().fullRestart(); @@ -87,30 +78,6 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { for (int i = 0; i < numberOfTasks; i++) { PersistentTask task = tasksInProgress.getTask(taskIds[i]); assertNotNull(task); - assertThat(task.isStopped(), equalTo(stopped[i])); - } - - logger.info("Waiting for {} original tasks to start", numberOfRunningTasks); - assertBusy(() -> { - // Wait for the running task to start automatically - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), equalTo(numberOfRunningTasks)); - }); - - // Start all other tasks - tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - service = internalCluster().getInstance(PersistentTasksService.class); - for (int i = 0; i < numberOfTasks; i++) { - PersistentTask task = tasksInProgress.getTask(taskIds[i]); - assertNotNull(task); - logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode()); - assertThat(task.isStopped(), equalTo(stopped[i])); - assertThat(task.getExecutorNode(), stopped[i] ? nullValue() : notNullValue()); - if (stopped[i]) { - PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture(); - service.startTask(task.getId(), startFuture); - assertEquals(startFuture.get(), (Long) task.getId()); - } } logger.info("Waiting for {} tasks to start", numberOfTasks); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java index b0bbf2c85c3..040be67766d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java @@ -112,67 +112,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { stopOrCancelTask(firstRunningTask.getTaskId()); } - public void testPersistentActionCompletionWithoutRemoval() throws Exception { - boolean stopped = randomBoolean(); - PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); - persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped, false, - future); - long taskId = future.get(); - - PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasksInProgress.tasks().size(), equalTo(1)); - assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped)); - assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue()); - assertThat(tasksInProgress.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); - - int numberOfIters = randomIntBetween(1, 5); // we will start/stop the action a few times before removing it - logger.info("start/stop the task {} times stating with stopped {}", numberOfIters, stopped); - for (int i = 0; i < numberOfIters; i++) { - logger.info("iteration {}", i); - if (stopped) { - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks(), empty()); - PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture(); - persistentTasksService.startTask(taskId, startFuture); - assertEquals(startFuture.get(), (Long) taskId); - } - assertBusy(() -> { - // Wait for the task to start - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), equalTo(1)); - }); - TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") - .get().getTasks().get(0); - - stopOrCancelTask(firstRunningTask.getTaskId()); - - assertBusy(() -> { - // Wait for the task to finish - List tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") - .get().getTasks(); - logger.info("Found {} tasks", tasks.size()); - assertThat(tasks.size(), equalTo(0)); - }); - stopped = true; - } - - assertBusy(() -> { - // Wait for the task to be marked as stopped - PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(1)); - assertThat(tasks.getTask(taskId).isStopped(), equalTo(true)); - assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); - }); - - logger.info("Removing action record from cluster state"); - PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture(); - persistentTasksService.removeTask(taskId, removeFuture); - assertEquals(removeFuture.get(), (Long) taskId); - } - public void testPersistentActionWithNoAvailableNode() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index bbcf518041b..93a0ffb3988 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -76,11 +76,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.addTask("test_action", new TestRequest("other_" + i), false, true, + tasks.addTask("test_action", new TestRequest("other_" + i), new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node")); if (added == false && randomBoolean()) { added = true; - tasks.addTask("test", new TestRequest("this_param"), false, true, + tasks.addTask("test", new TestRequest("this_param"), new Assignment("this_node", "test assignment on this node")); } } @@ -303,7 +303,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, - builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build(); + builder.addTask(action, request, new Assignment(node, "test assignment")).build())).build(); } private ClusterState reallocateTask(ClusterState state, long taskId, String node) {