diff --git a/server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java index d0d52ed4a70..519bc21acc5 100644 --- a/server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java @@ -69,10 +69,6 @@ public class CreatePersistentTaskAction extends Action { @@ -235,7 +193,7 @@ public class CreatePersistentTaskAction extends Action listener) { - persistentTasksClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion, + persistentTasksClusterService.createPersistentTask(request.action, request.request, new ActionListener() { @Override public void onResponse(Long newTaskId) { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 8203ff3396c..5f22dd25a22 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -61,21 +61,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param request request * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String action, Request request, boolean stopped, - boolean removeOnCompletion, + public void createPersistentTask(String action, Request request, ActionListener listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { validate(action, clusterService.state(), request); final Assignment assignment; - if (stopped) { - // the task is stopped no need to assign it anywhere - assignment = PersistentTasksCustomMetaData.FINISHED_TASK_ASSIGNMENT; - } else { - assignment = getAssignement(action, currentState, request); - } - return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, assignment)); + assignment = getAssignement(action, currentState, request); + return update(currentState, builder(currentState).addTask(action, request, assignment)); } @Override @@ -319,11 +313,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements logger.trace("ignoring task {} because assignment is the same {}", task.getId(), assignment); } } else { - if (task.isStopped()) { - logger.trace("ignoring task {} because it is stopped", task.getId()); - } else { - logger.trace("ignoring task {} because it is still running", task.getId()); - } + logger.trace("ignoring task {} because it is still running", task.getId()); } } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index 3805a510c39..a5048052a18 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -97,8 +97,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskBuilder, List objects) -> { if (objects.size() != 1) { @@ -240,8 +238,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable task, boolean stopped, Assignment assignment) { - this(task.id, task.allocationId + 1L, task.taskName, task.request, stopped, task.removeOnCompletion, task.status, + public PersistentTask(PersistentTask task, Assignment assignment) { + this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status, assignment, task.allocationId); } public PersistentTask(PersistentTask task, Status status) { - this(task.id, task.allocationId, task.taskName, task.request, task.stopped, task.removeOnCompletion, status, + this(task.id, task.allocationId, task.taskName, task.request, status, task.assignment, task.allocationId); } - private PersistentTask(long id, long allocationId, String taskName, Request request, boolean stopped, boolean removeOnCompletion, + private PersistentTask(long id, long allocationId, String taskName, Request request, Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; this.taskName = taskName; this.request = request; this.status = status; - this.stopped = stopped; - this.removeOnCompletion = removeOnCompletion; this.assignment = assignment; this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; // Update parent request for starting tasks with correct parent task ID @@ -284,8 +278,6 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable setStopped(boolean stopped) { - this.stopped = stopped; - return this; - } - - public TaskBuilder setRemoveOnCompletion(boolean removeOnCompletion) { - this.removeOnCompletion = removeOnCompletion; - return this; - } - public TaskBuilder setAssignment(Assignment assignment) { this.assignment = assignment; return this; @@ -492,7 +458,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable build() { - return new PersistentTask<>(id, allocationId, taskName, request, stopped, removeOnCompletion, status, + return new PersistentTask<>(id, allocationId, taskName, request, status, assignment, allocationIdOnLastStatusUpdate); } } @@ -578,11 +544,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable * After the task is added its id can be found by calling {{@link #getCurrentId()}} method. */ - public Builder addTask(String taskName, Request request, boolean stopped, - boolean removeOnCompletion, Assignment assignment) { + public Builder addTask(String taskName, Request request, Assignment assignment) { changed = true; currentId++; - tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, stopped, removeOnCompletion, assignment)); + tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment)); return this; } @@ -593,7 +558,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment)); } return this; } @@ -609,9 +574,9 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskInProgress = (PersistentTask) tasks.get(taskId); if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request); - if (assignment.isAssigned() || taskInProgress.isStopped()) { + if (assignment.isAssigned()) { changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment)); } } return this; @@ -649,11 +614,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - if (taskInProgress.removeOnCompletion) { - tasks.remove(taskId); - } else { - tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT)); - } + tasks.remove(taskId); } return this; } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 807b4d63860..ff3e49c8d79 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -42,27 +42,14 @@ public class PersistentTasksService extends AbstractComponent { this.clusterService = clusterService; } - /** - * Creates the specified persistent action and tries to start it immediately, upon completion the task is - * removed from the cluster state - */ - public void createPersistentActionTask(String action, Request request, - PersistentTaskOperationListener listener) { - createPersistentActionTask(action, request, false, true, listener); - } - /** * Creates the specified persistent action. The action is started unless the stopped parameter is equal to true. * If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion. * Otherwise it will remain there in the stopped state. */ public void createPersistentActionTask(String action, Request request, - boolean stopped, - boolean removeOnCompletion, PersistentTaskOperationListener listener) { CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request); - createPersistentActionRequest.setStopped(stopped); - createPersistentActionRequest.setRemoveOnCompletion(removeOnCompletion); try { client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( o -> listener.onResponse(o.getTaskId()), listener::onFailure)); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index fd538f8d6ec..e5dda159961 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -92,7 +92,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(2, 40); for (int i = 0; i < numberOfTasks; i++) { - addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits"); } MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); @@ -113,21 +113,17 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(0, 40); for (int i = 0; i < numberOfTasks; i++) { - switch (randomInt(3)) { + switch (randomInt(2)) { case 0: // add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned - addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits"); break; case 1: // add a task assigned to non-existing node that should not get assigned - addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits"); break; case 2: - // add a stopped task assigned to non-existing node that should not get assigned - addTask(tasks, "should_not_assign", "fail_me_if_called", null, true); - break; - case 3: - addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits"); break; } @@ -145,39 +141,34 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { int assignOneCount = 0; for (PersistentTask task : tasksInProgress.tasks()) { - if (task.isStopped()) { - assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue()); - assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getTaskName())); - } else { - // explanation should correspond to the action name - switch (task.getTaskName()) { - case "should_assign": - assertThat(task.getExecutorNode(), notNullValue()); - assertThat(task.isAssigned(), equalTo(true)); - if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { - logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString()); - } - assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), - clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); + // explanation should correspond to the action name + switch (task.getTaskName()) { + case "should_assign": + assertThat(task.getExecutorNode(), notNullValue()); + assertThat(task.isAssigned(), equalTo(true)); + if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { + logger.info(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE).toString()); + } + assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), + clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); + break; + case "should_not_assign": + assertThat(task.getExecutorNode(), nullValue()); + assertThat(task.isAssigned(), equalTo(false)); + assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment")); + break; + case "assign_one": + if (task.isAssigned()) { + assignOneCount++; + assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1)); assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); - break; - case "should_not_assign": - assertThat(task.getExecutorNode(), nullValue()); - assertThat(task.isAssigned(), equalTo(false)); - assertThat(task.getAssignment().getExplanation(), equalTo("no appropriate nodes found for the assignment")); - break; - case "assign_one": - if (task.isAssigned()) { - assignOneCount++; - assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1)); - assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); - } else { - assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time")); - } - break; - default: - fail("Unknown action " + task.getTaskName()); - } + } else { + assertThat(task.getAssignment().getExplanation(), equalTo("only one task can be assigned at a time")); + } + break; + default: + fail("Unknown action " + task.getTaskName()); } } } @@ -218,8 +209,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { DiscoveryNodes nodes = clusterState.nodes(); PersistentTasksCustomMetaData tasksInProgress = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasksInProgress.findTasks("assign_one", - task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) { + if (tasksInProgress.findTasks("assign_one", task -> nodes.nodeExists(task.getExecutorNode())).isEmpty()) { return randomNodeAssignment(clusterState.nodes()); } else { return new Assignment(null, "only one task can be assigned at a time"); @@ -269,13 +259,12 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { // we don't have any unassigned tasks - add some if (randomBoolean()) { logger.info("added random task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null, - false); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), null); tasksOrNodesChanged = true; } else { logger.info("added unassignable task with custom assignment message"); addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksCustomMetaData.builder(tasks), - new Assignment(null, "change me"), "never_assign", false); + new Assignment(null, "change me"), "never_assign"); tasksOrNodesChanged = true; } } @@ -332,23 +321,16 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { } if (randomBoolean()) { logger.info("added random unassignable task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign", false); - return builder.build(); - } - if (randomBoolean()) { - // add unassigned task in stopped state - logger.info("added random stopped task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, null, true); - return builder.build(); - } else { - logger.info("changed routing table"); - MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); - metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()); - RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); - changeRoutingTable(metaData, routingTable); - builder.metaData(metaData).routingTable(routingTable.build()); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), tasksBuilder, NO_NODE_FOUND, "never_assign"); return builder.build(); } + logger.info("changed routing table"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()); + RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); + changeRoutingTable(metaData, routingTable); + builder.metaData(metaData).routingTable(routingTable.build()); + return builder.build(); } } if (randomBoolean()) { @@ -398,9 +380,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { return false; } return tasks.tasks().stream().anyMatch(task -> { - if (task.isStopped()) { - return false; - } if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) { return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false; } @@ -415,20 +394,20 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, - String node, boolean stopped) { + String node) { return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)), - randomAsciiOfLength(10), stopped); + randomAsciiOfLength(10)); } private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, - Assignment assignment, String param, boolean stopped) { + Assignment assignment, String param) { return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE, - tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build())); + tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), assignment).build())); } - private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node, boolean stopped) { - tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action)); + private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) { + tasks.addTask(action, new TestRequest(param), new Assignment(node, "explanation: " + action)); } private DiscoveryNode newNode(String nodeId) { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java index dfeb7152e1c..3465407db52 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java @@ -57,9 +57,8 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ int numberOfTasks = randomInt(10); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); for (int i = 0; i < numberOfTasks; i++) { - boolean stopped = randomBoolean(); tasks.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), - stopped, randomBoolean(), stopped ? new Assignment(null, "stopped") : randomAssignment()); + randomAssignment()); if (randomBoolean()) { // From time to time update status tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10))); @@ -149,9 +148,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ } private Builder addRandomTask(Builder builder) { - boolean stopped = randomBoolean(); - builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), - stopped ? new Assignment(null, "stopped") : randomAssignment()); + builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAsciiOfLength(10)), randomAssignment()); return builder; } @@ -196,7 +193,6 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ assertEquals(testTask.getId(), newTask.getId()); assertEquals(testTask.getStatus(), newTask.getStatus()); assertEquals(testTask.getRequest(), newTask.getRequest()); - assertEquals(testTask.isStopped(), newTask.isStopped()); // Things that shouldn't be serialized assertEquals(0, newTask.getAllocationId()); @@ -233,16 +229,12 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ if (randomBoolean()) { // Trying to reassign to the same node builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment()); - // should change if the task was stopped AND unassigned - if (task.getExecutorNode() == null && task.isStopped()) { - changed = true; - } } else { // Trying to reassign to a different node Assignment randomAssignment = randomAssignment(); builder.assignTask(lastKnownTask, (s, request) -> randomAssignment); // should change if the task was unassigned and was reassigned to a different node or started - if ((task.isAssigned() == false && randomAssignment.isAssigned()) || task.isStopped()) { + if ((task.isAssigned() == false && randomAssignment.isAssigned())) { changed = true; } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java index a13d683673f..a4c7061da05 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java @@ -60,35 +60,26 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { long[] taskIds = new long[numberOfTasks]; List futures = new ArrayList<>(numberOfTasks); - boolean[] stopped = new boolean[numberOfTasks]; - int runningTasks = 0; for (int i = 0; i < numberOfTasks; i++) { - stopped[i] = randomBoolean(); - if (stopped[i] == false) { - runningTasks++; - } PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); futures.add(future); - service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped[i], true, future); + service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); } for (int i = 0; i < numberOfTasks; i++) { taskIds[i] = futures.get(i).get(); } - final int numberOfRunningTasks = runningTasks; PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() .custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); - if (numberOfRunningTasks > 0) { - // Make sure that at least one of the tasks is running - assertBusy(() -> { - // Wait for the task to start - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), greaterThan(0)); - }); - } + // Make sure that at least one of the tasks is running + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() + .getTasks().size(), greaterThan(0)); + }); // Restart cluster internalCluster().fullRestart(); @@ -100,30 +91,6 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { for (int i = 0; i < numberOfTasks; i++) { PersistentTask task = tasksInProgress.getTask(taskIds[i]); assertNotNull(task); - assertThat(task.isStopped(), equalTo(stopped[i])); - } - - logger.info("Waiting for {} original tasks to start", numberOfRunningTasks); - assertBusy(() -> { - // Wait for the running task to start automatically - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), equalTo(numberOfRunningTasks)); - }); - - // Start all other tasks - tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - service = internalCluster().getInstance(PersistentTasksService.class); - for (int i = 0; i < numberOfTasks; i++) { - PersistentTask task = tasksInProgress.getTask(taskIds[i]); - assertNotNull(task); - logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode()); - assertThat(task.isStopped(), equalTo(stopped[i])); - assertThat(task.getExecutorNode(), stopped[i] ? nullValue() : notNullValue()); - if (stopped[i]) { - PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture(); - service.startTask(task.getId(), startFuture); - assertEquals(startFuture.get(), (Long) task.getId()); - } } logger.info("Waiting for {} tasks to start", numberOfTasks); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 87793860caa..6bc3d297be4 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -126,67 +126,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { stopOrCancelTask(firstRunningTask.getTaskId()); } - public void testPersistentActionCompletionWithoutRemoval() throws Exception { - boolean stopped = randomBoolean(); - PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); - persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), stopped, false, - future); - long taskId = future.get(); - - PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasksInProgress.tasks().size(), equalTo(1)); - assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped)); - assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue()); - assertThat(tasksInProgress.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); - - int numberOfIters = randomIntBetween(1, 5); // we will start/stop the action a few times before removing it - logger.info("start/stop the task {} times stating with stopped {}", numberOfIters, stopped); - for (int i = 0; i < numberOfIters; i++) { - logger.info("iteration {}", i); - if (stopped) { - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks(), empty()); - PersistentTaskOperationFuture startFuture = new PersistentTaskOperationFuture(); - persistentTasksService.startTask(taskId, startFuture); - assertEquals(startFuture.get(), (Long) taskId); - } - assertBusy(() -> { - // Wait for the task to start - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), equalTo(1)); - }); - TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") - .get().getTasks().get(0); - - stopOrCancelTask(firstRunningTask.getTaskId()); - - assertBusy(() -> { - // Wait for the task to finish - List tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") - .get().getTasks(); - logger.info("Found {} tasks", tasks.size()); - assertThat(tasks.size(), equalTo(0)); - }); - stopped = true; - } - - assertBusy(() -> { - // Wait for the task to be marked as stopped - PersistentTasksCustomMetaData tasks = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(1)); - assertThat(tasks.getTask(taskId).isStopped(), equalTo(true)); - assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); - }); - - logger.info("Removing action record from cluster state"); - PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture(); - persistentTasksService.removeTask(taskId, removeFuture); - assertEquals(removeFuture.get(), (Long) taskId); - } - public void testPersistentActionWithNoAvailableNode() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 1eb235ef639..5f74f6963c7 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -90,11 +90,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.addTask("test_action", new TestRequest("other_" + i), false, true, + tasks.addTask("test_action", new TestRequest("other_" + i), new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node")); if (added == false && randomBoolean()) { added = true; - tasks.addTask("test", new TestRequest("this_param"), false, true, + tasks.addTask("test", new TestRequest("this_param"), new Assignment("this_node", "test assignment on this node")); } } @@ -317,7 +317,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, - builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build(); + builder.addTask(action, request, new Assignment(node, "test assignment")).build())).build(); } private ClusterState reallocateTask(ClusterState state, long taskId, String node) {