diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java b/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java index efb73471e1e..80faf8083bc 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentActionCoordinator.java @@ -132,6 +132,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl try { RunningPersistentTask runningPersistentTask = new RunningPersistentTask(task, taskInProgress.getId()); task.setStatusProvider(runningPersistentTask); + task.setPersistentTaskId(taskInProgress.getId()); PersistentTaskListener listener = new PersistentTaskListener(runningPersistentTask); try { runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), runningPersistentTask); diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java index 7534b15e634..073dfb059aa 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentActionService.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; /** @@ -73,4 +74,14 @@ public class PersistentActionService extends AbstractComponent { listener.onFailure(e); } } + + public void updateStatus(long taskId, Task.Status status, ActionListener<UpdatePersistentTaskStatusAction.Response> listener) { + UpdatePersistentTaskStatusAction.Request updateStatusRequest = new UpdatePersistentTaskStatusAction.Request(taskId, status); + try { + client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTask.java index b122e8cda84..559f970ba92 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTask.java @@ -29,6 +29,8 @@ import org.elasticsearch.tasks.TaskId; public class PersistentTask extends CancellableTask { private Provider<Status> statusProvider; + private long persistentTaskId; + public PersistentTask(long id, String type, String action, String description, TaskId parentTask) { super(id, type, action, description, parentTask); } @@ -52,4 +54,12 @@ public class PersistentTask extends CancellableTask { assert this.statusProvider == null; this.statusProvider = statusProvider; } + + public long getPersistentTaskId() { + return persistentTaskId; + } + + public void setPersistentTaskId(long persistentTaskId) { + this.persistentTaskId = persistentTaskId; + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java index 4a76545bc83..53c8ecefb64 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskClusterService.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.transport.TransportResponse.Empty; @@ -133,13 +134,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C currentTasks.add(taskInProgress); } } - if (found) { - ClusterState.Builder builder = ClusterState.builder(currentState); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(tasksInProgress.getCurrentId(), currentTasks); - return builder.putCustom(PersistentTasksInProgress.TYPE, tasks).build(); - } else { - return currentState; - } + return rebuildClusterStateIfNeeded(found, currentState, currentTasks); } @Override @@ -154,6 +149,61 @@ public class PersistentTaskClusterService extends AbstractComponent implements C }); } + /** + * Update task status + * + * @param id the id of a persistent task + * @param status new status + * @param listener the listener that will be called when task is removed + */ + public void updatePersistentTaskStatus(long id, Task.Status status, ActionListener<Empty> listener) { + clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); + if (tasksInProgress == null) { + // Nothing to do, the task no longer exists + return currentState; + } + + boolean found = false; + final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>(); + for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) { + if (taskInProgress.getId() == id) { + assert found == false; + found = true; + currentTasks.add(new PersistentTaskInProgress<>(taskInProgress, status)); + } else { + currentTasks.add(taskInProgress); + } + } + return rebuildClusterStateIfNeeded(found, currentState, currentTasks); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(Empty.INSTANCE); + } + }); + } + + private ClusterState rebuildClusterStateIfNeeded(boolean rebuild, ClusterState oldState, + List<PersistentTaskInProgress<?>> currentTasks) { + if (rebuild) { + ClusterState.Builder builder = ClusterState.builder(oldState); + PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), currentTasks); + return builder.putCustom(PersistentTasksInProgress.TYPE, tasks).build(); + } else { + return oldState; + } + } + private <Request extends PersistentActionRequest> String executorNode(String action, ClusterState currentState, Request request) { TransportPersistentAction<Request> persistentAction = registry.getPersistentActionSafe(action); persistentAction.validate(request, currentState); diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java index 982295c598d..cd29a8e484b 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksInProgress.java @@ -28,6 +28,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.Task.Status; import java.io.IOException; import java.util.Collection; @@ -101,23 +103,31 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust private final String action; private final Request request; @Nullable + private final Status status; + @Nullable private final String executorNode; public PersistentTaskInProgress(long id, String action, Request request, String executorNode) { - this(id, 0L, action, request, executorNode); + this(id, 0L, action, request, null, executorNode); } public PersistentTaskInProgress(PersistentTaskInProgress<Request> persistentTaskInProgress, String newExecutorNode) { this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId + 1L, - persistentTaskInProgress.action, persistentTaskInProgress.request, newExecutorNode); + persistentTaskInProgress.action, persistentTaskInProgress.request, null, newExecutorNode); } - private PersistentTaskInProgress(long id, long allocationId, String action, Request request, String executorNode) { + public PersistentTaskInProgress(PersistentTaskInProgress<Request> persistentTaskInProgress, Status status) { + this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId, + persistentTaskInProgress.action, persistentTaskInProgress.request, status, persistentTaskInProgress.executorNode); + } + + private PersistentTaskInProgress(long id, long allocationId, String action, Request request, Status status, String executorNode) { this.id = id; this.allocationId = allocationId; this.action = action; this.request = request; + this.status = status; this.executorNode = executorNode; // Update parent request for starting tasks with correct parent task ID request.setParentTask("cluster", id); @@ -129,6 +139,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust allocationId = in.readLong(); action = in.readString(); request = (Request) in.readNamedWriteable(PersistentActionRequest.class); + status = in.readOptionalNamedWriteable(Task.Status.class); executorNode = in.readOptionalString(); } @@ -138,6 +149,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust out.writeLong(allocationId); out.writeString(action); out.writeNamedWriteable(request); + out.writeOptionalNamedWriteable(status); out.writeOptionalString(executorNode); } @@ -150,12 +162,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust allocationId == that.allocationId && Objects.equals(action, that.action) && Objects.equals(request, that.request) && + Objects.equals(status, that.status) && Objects.equals(executorNode, that.executorNode); } @Override public int hashCode() { - return Objects.hash(id, allocationId, action, request, executorNode); + return Objects.hash(id, allocationId, action, request, status, executorNode); } public long getId() { @@ -179,6 +192,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust return executorNode; } + @Nullable + public Status getStatus() { + return status; + } } @Override @@ -224,6 +241,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust builder.field("action", entry.action); builder.field("request"); entry.request.toXContent(builder, params); + if (entry.status != null) { + builder.field("status", entry.status, params); + } builder.field("executor_node", entry.executorNode); } builder.endObject(); diff --git a/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java b/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java index e4fe4eb316d..7e98262a6d6 100644 --- a/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/TransportPersistentAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; @@ -101,9 +102,30 @@ public abstract class TransportPersistentAction<Request extends PersistentAction persistentActionService.sendRequest(actionName, request, listener); } + /** + * Updates the persistent task status in the cluster state. + * <p> + * The status can be used to store the current progress of the task or provide an insight for the + * task allocator about the state of the currently running tasks. + */ + protected void updatePersistentTaskStatus(PersistentTask task, Task.Status status, ActionListener<Empty> listener) { + persistentActionService.updateStatus(task.getPersistentTaskId(), status, + new ActionListener<UpdatePersistentTaskStatusAction.Response>() { + @Override + public void onResponse(UpdatePersistentTaskStatusAction.Response response) { + listener.onResponse(Empty.INSTANCE); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + /** * This operation will be executed on the executor node. - * + * <p> * If nodeOperation throws an exception or triggers listener.onFailure() method, the task will be restarted, * possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully * completed and will be removed from the cluster state and not restarted. diff --git a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java new file mode 100644 index 00000000000..332dbceb287 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java @@ -0,0 +1,223 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse.Empty; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request, + UpdatePersistentTaskStatusAction.Response, + UpdatePersistentTaskStatusAction.RequestBuilder> { + + public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction(); + public static final String NAME = "cluster:admin/persistent/update_status"; + + private UpdatePersistentTaskStatusAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends MasterNodeRequest<Request> { + + private long taskId; + + private Task.Status status; + + public Request() { + + } + + public Request(long taskId, Task.Status status) { + this.taskId = taskId; + this.status = status; + } + + public void setTaskId(long taskId) { + this.taskId = taskId; + } + + public void setStatus(Task.Status status) { + this.status = status; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + taskId = in.readLong(); + status = in.readOptionalNamedWriteable(Task.Status.class); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(taskId); + out.writeOptionalNamedWriteable(status); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return taskId == request.taskId && + Objects.equals(status, request.status); + } + + @Override + public int hashCode() { + return Objects.hash(taskId, status); + } + } + + public static class Response extends AcknowledgedResponse { + public Response() { + super(); + } + + public Response(boolean acknowledged) { + super(acknowledged); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + readAcknowledged(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + writeAcknowledged(out); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AcknowledgedResponse that = (AcknowledgedResponse) o; + return isAcknowledged() == that.isAcknowledged(); + } + + @Override + public int hashCode() { + return Objects.hash(isAcknowledged()); + } + + } + + public static class RequestBuilder extends MasterNodeOperationRequestBuilder<UpdatePersistentTaskStatusAction.Request, + UpdatePersistentTaskStatusAction.Response, UpdatePersistentTaskStatusAction.RequestBuilder> { + + protected RequestBuilder(ElasticsearchClient client, UpdatePersistentTaskStatusAction action) { + super(client, action, new Request()); + } + + public final RequestBuilder setTaskId(long taskId) { + request.setTaskId(taskId); + return this; + } + + public final RequestBuilder setStatus(Task.Status status) { + request.setStatus(status); + return this; + } + + } + + public static class TransportAction extends TransportMasterNodeAction<Request, Response> { + + private final PersistentTaskClusterService persistentTaskClusterService; + + @Inject + public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + PersistentTaskClusterService persistentTaskClusterService, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, UpdatePersistentTaskStatusAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + this.persistentTaskClusterService = persistentTaskClusterService; + } + + @Override + protected String executor() { + return ThreadPool.Names.MANAGEMENT; + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + // Cluster is not affected but we look up repositories in metadata + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) { + persistentTaskClusterService.updatePersistentTaskStatus(request.taskId, request.status, new ActionListener<Empty>() { + @Override + public void onResponse(Empty empty) { + listener.onResponse(new Response(true)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java index b4738ec5932..2c1b7ccefb9 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentActionIT.java @@ -34,6 +34,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class PersistentActionIT extends ESIntegTestCase { @@ -132,17 +134,7 @@ public class PersistentActionIT extends ESIntegTestCase { } - assertBusy(() -> { - // Wait for the task to finish - List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get() - .getTasks(); - logger.info("Found {} tasks", tasks.size()); - assertThat(tasks.size(), equalTo(0)); - - // Make sure the task is removed from the cluster state - assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE)) - .entries(), empty()); - }); + assertNoRunningTasks(); } public void testPersistentActionWithNoAvailableNode() throws Exception { @@ -176,5 +168,57 @@ public class PersistentActionIT extends ESIntegTestCase { } + public void testPersistentActionStatusUpdate() throws Exception { + TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get(); + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(), + equalTo(1)); + }); + TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]") + .get().getTasks().get(0); + + PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress.entries().size(), equalTo(1)); + assertThat(tasksInProgress.entries().get(0).getStatus(), nullValue()); + + int numberOfUpdates = randomIntBetween(1, 10); + for (int i = 0; i < numberOfUpdates; i++) { + logger.info("Updating the task status"); + // Complete the running task and make sure it finishes properly + assertThat(new TestTasksRequestBuilder(client()).setOperation("update_status").setTaskId(firstRunningTask.getTaskId()) + .get().getTasks().size(), equalTo(1)); + + int finalI = i; + assertBusy(() -> { + PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); + assertThat(tasks.entries().size(), equalTo(1)); + assertThat(tasks.entries().get(0).getStatus(), notNullValue()); + assertThat(tasks.entries().get(0).getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}")); + }); + + } + + logger.info("Completing the running task"); + // Complete the running task and make sure it finishes properly + assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) + .get().getTasks().size(), equalTo(1)); + + assertNoRunningTasks(); + } + + private void assertNoRunningTasks() throws Exception { + assertBusy(() -> { + // Wait for the task to finish + List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get() + .getTasks(); + logger.info("Found {} tasks", tasks.size()); + assertThat(tasks.size(), equalTo(0)); + + // Make sure the task is removed from the cluster state + assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE)) + .entries(), empty()); + }); + } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java index 4bdb6f7bc9c..e9f91740507 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksInProgressTests.java @@ -21,11 +21,14 @@ package org.elasticsearch.persistent; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.persistent.TestPersistentActionPlugin.Status; import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.List; public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase<PersistentTasksInProgress> { @@ -33,11 +36,16 @@ public class PersistentTasksInProgressTests extends AbstractWireSerializingTestC @Override protected PersistentTasksInProgress createTestInstance() { int numberOfTasks = randomInt(10); - List<PersistentTasksInProgress.PersistentTaskInProgress<?>> entries = new ArrayList<>(); + List<PersistentTaskInProgress<?>> entries = new ArrayList<>(); for (int i = 0; i < numberOfTasks; i++) { - entries.add(new PersistentTasksInProgress.PersistentTaskInProgress<>( + PersistentTaskInProgress<?> taskInProgress = new PersistentTaskInProgress<>( randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)), - randomAsciiOfLength(10))); + randomAsciiOfLength(10)); + if (randomBoolean()) { + // From time to time update status + taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10))); + } + entries.add(taskInProgress); } return new PersistentTasksInProgress(randomLong(), entries); } @@ -49,8 +57,9 @@ public class PersistentTasksInProgressTests extends AbstractWireSerializingTestC @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { - return new NamedWriteableRegistry(Collections.singletonList( - new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new) + return new NamedWriteableRegistry(Arrays.asList( + new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new), + new Entry(Task.Status.class, Status.NAME, Status::new) )); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java index 28e7a6fca52..f0a1beaa461 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentActionPlugin.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -55,6 +56,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; @@ -66,7 +68,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import static java.util.Objects.requireNonNull; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.junit.Assert.assertTrue; @@ -83,6 +90,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(TestPersistentAction.INSTANCE, TransportTestPersistentAction.class), new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class), new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), + new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class) ); @@ -109,7 +117,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { new NamedWriteableRegistry.Entry(PersistentActionCoordinator.Status.class, PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new), new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), - new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom) + new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) ); } @@ -243,6 +252,63 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { } } + public static class Status implements Task.Status { + public static final String NAME = "test"; + + private final String phase; + + public Status(String phase) { + this.phase = requireNonNull(phase, "Phase cannot be null"); + } + + public Status(StreamInput in) throws IOException { + phase = in.readString(); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("phase", phase); + builder.endObject(); + return builder; + } + + @Override + public boolean isFragment() { + return false; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(phase); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + // Implements equals and hashcode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != Status.class) { + return false; + } + Status other = (Status) obj; + return phase.equals(other.phase); + } + + @Override + public int hashCode() { + return phase.hashCode(); + } + } + public static class TransportTestPersistentAction extends TransportPersistentAction<TestRequest> { @@ -274,14 +340,41 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; - assertTrue(awaitBusy(() -> testTask.isCancelled() || - testTask.getOperation() != null || - transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes - if (transportService.lifecycleState() == Lifecycle.State.STARTED) { + AtomicInteger phase = new AtomicInteger(); + while (true) { + // wait for something to happen + assertTrue(awaitBusy(() -> testTask.isCancelled() || + testTask.getOperation() != null || + transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes + if (transportService.lifecycleState() != Lifecycle.State.STARTED) { + return; + } if ("finish".equals(testTask.getOperation())) { listener.onResponse(Empty.INSTANCE); + return; } else if ("fail".equals(testTask.getOperation())) { listener.onFailure(new RuntimeException("Simulating failure")); + return; + } else if ("update_status".equals(testTask.getOperation())) { + testTask.setOperation(null); + CountDownLatch latch = new CountDownLatch(1); + Status status = new Status("phase " + phase.incrementAndGet()); + logger.info("updating the task status to {}", status); + updatePersistentTaskStatus(task, status, new ActionListener<Empty>() { + @Override + public void onResponse(Empty empty) { + logger.info("updating was successful"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.info("updating failed", e); + latch.countDown(); + fail(e.toString()); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); } else if (testTask.isCancelled()) { // Cancellation make cause different ways for the task to finish if (randomBoolean()) { @@ -293,6 +386,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { } else { listener.onFailure(new RuntimeException(testTask.getReasonCancelled())); } + return; } else { fail("We really shouldn't be here"); } @@ -432,8 +526,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { @Inject public TransportTestTaskAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, String nodeExecutor) { + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, String nodeExecutor) { super(settings, TestTaskAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, TestTasksRequest::new, TestTasksResponse::new, ThreadPool.Names.MANAGEMENT); }