Add support for task status on persistent tasks

Similarly to task status on normal tasks it's now possible to update task status on the persistent tasks. This should allow updating the state of the running tasks (such as loading, started, etc) as well as store intermediate state or progress.

Original commit: elastic/x-pack@048006b467
This commit is contained in:
Igor Motov 2017-02-01 18:06:22 -05:00 committed by Martijn van Groningen
parent 777b21f2ef
commit ac67d02bc3
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
10 changed files with 520 additions and 36 deletions

View File

@ -132,6 +132,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
try {
RunningPersistentTask runningPersistentTask = new RunningPersistentTask(task, taskInProgress.getId());
task.setStatusProvider(runningPersistentTask);
task.setPersistentTaskId(taskInProgress.getId());
PersistentTaskListener listener = new PersistentTaskListener(runningPersistentTask);
try {
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), runningPersistentTask);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
/**
@ -73,4 +74,14 @@ public class PersistentActionService extends AbstractComponent {
listener.onFailure(e);
}
}
public void updateStatus(long taskId, Task.Status status, ActionListener<UpdatePersistentTaskStatusAction.Response> listener) {
UpdatePersistentTaskStatusAction.Request updateStatusRequest = new UpdatePersistentTaskStatusAction.Request(taskId, status);
try {
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.tasks.TaskId;
public class PersistentTask extends CancellableTask {
private Provider<Status> statusProvider;
private long persistentTaskId;
public PersistentTask(long id, String type, String action, String description, TaskId parentTask) {
super(id, type, action, description, parentTask);
}
@ -52,4 +54,12 @@ public class PersistentTask extends CancellableTask {
assert this.statusProvider == null;
this.statusProvider = statusProvider;
}
public long getPersistentTaskId() {
return persistentTaskId;
}
public void setPersistentTaskId(long persistentTaskId) {
this.persistentTaskId = persistentTaskId;
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.transport.TransportResponse.Empty;
@ -133,13 +134,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
currentTasks.add(taskInProgress);
}
}
if (found) {
ClusterState.Builder builder = ClusterState.builder(currentState);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(tasksInProgress.getCurrentId(), currentTasks);
return builder.putCustom(PersistentTasksInProgress.TYPE, tasks).build();
} else {
return currentState;
}
return rebuildClusterStateIfNeeded(found, currentState, currentTasks);
}
@Override
@ -154,6 +149,61 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
});
}
/**
* Update task status
*
* @param id the id of a persistent task
* @param status new status
* @param listener the listener that will be called when task is removed
*/
public void updatePersistentTaskStatus(long id, Task.Status status, ActionListener<Empty> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress == null) {
// Nothing to do, the task no longer exists
return currentState;
}
boolean found = false;
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (taskInProgress.getId() == id) {
assert found == false;
found = true;
currentTasks.add(new PersistentTaskInProgress<>(taskInProgress, status));
} else {
currentTasks.add(taskInProgress);
}
}
return rebuildClusterStateIfNeeded(found, currentState, currentTasks);
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE);
}
});
}
private ClusterState rebuildClusterStateIfNeeded(boolean rebuild, ClusterState oldState,
List<PersistentTaskInProgress<?>> currentTasks) {
if (rebuild) {
ClusterState.Builder builder = ClusterState.builder(oldState);
PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), currentTasks);
return builder.putCustom(PersistentTasksInProgress.TYPE, tasks).build();
} else {
return oldState;
}
}
private <Request extends PersistentActionRequest> String executorNode(String action, ClusterState currentState, Request request) {
TransportPersistentAction<Request> persistentAction = registry.getPersistentActionSafe(action);
persistentAction.validate(request, currentState);

View File

@ -28,6 +28,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.Task.Status;
import java.io.IOException;
import java.util.Collection;
@ -101,23 +103,31 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
private final String action;
private final Request request;
@Nullable
private final Status status;
@Nullable
private final String executorNode;
public PersistentTaskInProgress(long id, String action, Request request, String executorNode) {
this(id, 0L, action, request, executorNode);
this(id, 0L, action, request, null, executorNode);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> persistentTaskInProgress, String newExecutorNode) {
this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId + 1L,
persistentTaskInProgress.action, persistentTaskInProgress.request, newExecutorNode);
persistentTaskInProgress.action, persistentTaskInProgress.request, null, newExecutorNode);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request, String executorNode) {
public PersistentTaskInProgress(PersistentTaskInProgress<Request> persistentTaskInProgress, Status status) {
this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId,
persistentTaskInProgress.action, persistentTaskInProgress.request, status, persistentTaskInProgress.executorNode);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request, Status status, String executorNode) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
this.request = request;
this.status = status;
this.executorNode = executorNode;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
@ -129,6 +139,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
allocationId = in.readLong();
action = in.readString();
request = (Request) in.readNamedWriteable(PersistentActionRequest.class);
status = in.readOptionalNamedWriteable(Task.Status.class);
executorNode = in.readOptionalString();
}
@ -138,6 +149,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
out.writeLong(allocationId);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(executorNode);
}
@ -150,12 +162,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
allocationId == that.allocationId &&
Objects.equals(action, that.action) &&
Objects.equals(request, that.request) &&
Objects.equals(status, that.status) &&
Objects.equals(executorNode, that.executorNode);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, action, request, executorNode);
return Objects.hash(id, allocationId, action, request, status, executorNode);
}
public long getId() {
@ -179,6 +192,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
return executorNode;
}
@Nullable
public Status getStatus() {
return status;
}
}
@Override
@ -224,6 +241,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<Clust
builder.field("action", entry.action);
builder.field("request");
entry.request.toXContent(builder, params);
if (entry.status != null) {
builder.field("status", entry.status, params);
}
builder.field("executor_node", entry.executorNode);
}
builder.endObject();

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
@ -101,9 +102,30 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
persistentActionService.sendRequest(actionName, request, listener);
}
/**
* Updates the persistent task status in the cluster state.
* <p>
* The status can be used to store the current progress of the task or provide an insight for the
* task allocator about the state of the currently running tasks.
*/
protected void updatePersistentTaskStatus(PersistentTask task, Task.Status status, ActionListener<Empty> listener) {
persistentActionService.updateStatus(task.getPersistentTaskId(), status,
new ActionListener<UpdatePersistentTaskStatusAction.Response>() {
@Override
public void onResponse(UpdatePersistentTaskStatusAction.Response response) {
listener.onResponse(Empty.INSTANCE);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
/**
* This operation will be executed on the executor node.
*
* <p>
* If nodeOperation throws an exception or triggers listener.onFailure() method, the task will be restarted,
* possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully
* completed and will be removed from the cluster state and not restarted.

View File

@ -0,0 +1,223 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTaskStatusAction.Request,
UpdatePersistentTaskStatusAction.Response,
UpdatePersistentTaskStatusAction.RequestBuilder> {
public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction();
public static final String NAME = "cluster:admin/persistent/update_status";
private UpdatePersistentTaskStatusAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeRequest<Request> {
private long taskId;
private Task.Status status;
public Request() {
}
public Request(long taskId, Task.Status status) {
this.taskId = taskId;
this.status = status;
}
public void setTaskId(long taskId) {
this.taskId = taskId;
}
public void setStatus(Task.Status status) {
this.status = status;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
status = in.readOptionalNamedWriteable(Task.Status.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
out.writeOptionalNamedWriteable(status);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return taskId == request.taskId &&
Objects.equals(status, request.status);
}
@Override
public int hashCode() {
return Objects.hash(taskId, status);
}
}
public static class Response extends AcknowledgedResponse {
public Response() {
super();
}
public Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<UpdatePersistentTaskStatusAction.Request,
UpdatePersistentTaskStatusAction.Response, UpdatePersistentTaskStatusAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, UpdatePersistentTaskStatusAction action) {
super(client, action, new Request());
}
public final RequestBuilder setTaskId(long taskId) {
request.setTaskId(taskId);
return this;
}
public final RequestBuilder setStatus(Task.Status status) {
request.setStatus(status);
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final PersistentTaskClusterService persistentTaskClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, UpdatePersistentTaskStatusAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTaskClusterService.updatePersistentTaskStatus(request.taskId, request.status, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -34,6 +34,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class PersistentActionIT extends ESIntegTestCase {
@ -132,17 +134,7 @@ public class PersistentActionIT extends ESIntegTestCase {
}
assertBusy(() -> {
// Wait for the task to finish
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get()
.getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(0));
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE))
.entries(), empty());
});
assertNoRunningTasks();
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
@ -176,5 +168,57 @@ public class PersistentActionIT extends ESIntegTestCase {
}
public void testPersistentActionStatusUpdate() throws Exception {
TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(),
equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
.get().getTasks().get(0);
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
assertThat(tasksInProgress.entries().size(), equalTo(1));
assertThat(tasksInProgress.entries().get(0).getStatus(), nullValue());
int numberOfUpdates = randomIntBetween(1, 10);
for (int i = 0; i < numberOfUpdates; i++) {
logger.info("Updating the task status");
// Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("update_status").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
int finalI = i;
assertBusy(() -> {
PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE);
assertThat(tasks.entries().size(), equalTo(1));
assertThat(tasks.entries().get(0).getStatus(), notNullValue());
assertThat(tasks.entries().get(0).getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
});
}
logger.info("Completing the running task");
// Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
assertNoRunningTasks();
}
private void assertNoRunningTasks() throws Exception {
assertBusy(() -> {
// Wait for the task to finish
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get()
.getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(0));
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE))
.entries(), empty());
});
}
}

View File

@ -21,11 +21,14 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.persistent.TestPersistentActionPlugin.Status;
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase<PersistentTasksInProgress> {
@ -33,11 +36,16 @@ public class PersistentTasksInProgressTests extends AbstractWireSerializingTestC
@Override
protected PersistentTasksInProgress createTestInstance() {
int numberOfTasks = randomInt(10);
List<PersistentTasksInProgress.PersistentTaskInProgress<?>> entries = new ArrayList<>();
List<PersistentTaskInProgress<?>> entries = new ArrayList<>();
for (int i = 0; i < numberOfTasks; i++) {
entries.add(new PersistentTasksInProgress.PersistentTaskInProgress<>(
PersistentTaskInProgress<?> taskInProgress = new PersistentTaskInProgress<>(
randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)),
randomAsciiOfLength(10)));
randomAsciiOfLength(10));
if (randomBoolean()) {
// From time to time update status
taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10)));
}
entries.add(taskInProgress);
}
return new PersistentTasksInProgress(randomLong(), entries);
}
@ -49,8 +57,9 @@ public class PersistentTasksInProgressTests extends AbstractWireSerializingTestC
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new)
return new NamedWriteableRegistry(Arrays.asList(
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new),
new Entry(Task.Status.class, Status.NAME, Status::new)
));
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -55,6 +56,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;
@ -66,7 +68,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.junit.Assert.assertTrue;
@ -83,6 +90,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(TestPersistentAction.INSTANCE, TransportTestPersistentAction.class),
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
);
@ -109,7 +117,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
new NamedWriteableRegistry.Entry(PersistentActionCoordinator.Status.class,
PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new),
new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom)
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new)
);
}
@ -243,6 +252,63 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
}
}
public static class Status implements Task.Status {
public static final String NAME = "test";
private final String phase;
public Status(String phase) {
this.phase = requireNonNull(phase, "Phase cannot be null");
}
public Status(StreamInput in) throws IOException {
phase = in.readString();
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("phase", phase);
builder.endObject();
return builder;
}
@Override
public boolean isFragment() {
return false;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(phase);
}
@Override
public String toString() {
return Strings.toString(this);
}
// Implements equals and hashcode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != Status.class) {
return false;
}
Status other = (Status) obj;
return phase.equals(other.phase);
}
@Override
public int hashCode() {
return phase.hashCode();
}
}
public static class TransportTestPersistentAction extends TransportPersistentAction<TestRequest> {
@ -274,14 +340,41 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
logger.info("started node operation for the task {}", task);
try {
TestTask testTask = (TestTask) task;
assertTrue(awaitBusy(() -> testTask.isCancelled() ||
testTask.getOperation() != null ||
transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes
if (transportService.lifecycleState() == Lifecycle.State.STARTED) {
AtomicInteger phase = new AtomicInteger();
while (true) {
// wait for something to happen
assertTrue(awaitBusy(() -> testTask.isCancelled() ||
testTask.getOperation() != null ||
transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes
if (transportService.lifecycleState() != Lifecycle.State.STARTED) {
return;
}
if ("finish".equals(testTask.getOperation())) {
listener.onResponse(Empty.INSTANCE);
return;
} else if ("fail".equals(testTask.getOperation())) {
listener.onFailure(new RuntimeException("Simulating failure"));
return;
} else if ("update_status".equals(testTask.getOperation())) {
testTask.setOperation(null);
CountDownLatch latch = new CountDownLatch(1);
Status status = new Status("phase " + phase.incrementAndGet());
logger.info("updating the task status to {}", status);
updatePersistentTaskStatus(task, status, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
logger.info("updating was successful");
latch.countDown();
}
@Override
public void onFailure(Exception e) {
logger.info("updating failed", e);
latch.countDown();
fail(e.toString());
}
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
} else if (testTask.isCancelled()) {
// Cancellation make cause different ways for the task to finish
if (randomBoolean()) {
@ -293,6 +386,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
} else {
listener.onFailure(new RuntimeException(testTask.getReasonCancelled()));
}
return;
} else {
fail("We really shouldn't be here");
}
@ -432,8 +526,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
@Inject
public TransportTestTaskAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, String nodeExecutor) {
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, String nodeExecutor) {
super(settings, TestTaskAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
TestTasksRequest::new, TestTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}