Adds support for persistent actions

A persistent action is a transport-like action that is using the cluster state instead of transport to start tasks. This allows persistent tasks to survive restart of executing nodes. A persistent action can be implemented by extending TransportPersistentAction. TransportPersistentAction will start the task by using PersistentActionService, which controls persistent tasks lifecycle.  See TestPersistentActionPlugin for an example implementing a persistent action.

Original commit: elastic/x-pack-elasticsearch@8ef4103cd6
This commit is contained in:
Igor Motov 2017-01-24 20:51:16 -05:00
parent ff65c38253
commit 827118e154
25 changed files with 3042 additions and 4 deletions

View File

@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -24,6 +25,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentActionCoordinator;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.PersistentTaskClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestController;
@ -165,8 +174,12 @@ public class MlPlugin extends Plugin implements ActionPlugin {
public List<NamedWriteableRegistry.Entry> getNamedWriteables() { public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList( return Arrays.asList(
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new), new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new) new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
); new NamedWriteableRegistry.Entry(PersistentActionCoordinator.Status.class,
PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new),
new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom)
);
} }
@Override @Override
@ -218,6 +231,8 @@ public class MlPlugin extends Plugin implements ActionPlugin {
autodetectProcessFactory, normalizerFactory); autodetectProcessFactory, normalizerFactory);
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider,
System::currentTimeMillis); System::currentTimeMillis);
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, clusterService, client);
PersistentActionRegistry persistentActionRegistry = new PersistentActionRegistry(Settings.EMPTY);
return Arrays.asList( return Arrays.asList(
jobProvider, jobProvider,
@ -225,7 +240,10 @@ public class MlPlugin extends Plugin implements ActionPlugin {
dataProcessor, dataProcessor,
new MlInitializationService(settings, threadPool, clusterService, jobProvider), new MlInitializationService(settings, threadPool, clusterService, jobProvider),
jobDataCountsPersister, jobDataCountsPersister,
datafeedJobRunner datafeedJobRunner,
persistentActionService,
persistentActionRegistry,
new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService)
); );
} }
@ -302,7 +320,10 @@ public class MlPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class), new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class),
new ActionHandler<>(InternalStartDatafeedAction.INSTANCE, InternalStartDatafeedAction.TransportAction.class), new ActionHandler<>(InternalStartDatafeedAction.INSTANCE, InternalStartDatafeedAction.TransportAction.class),
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class) new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
); );
} }

View File

@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
/**
* Action that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be
* removed from the cluster state in case of successful completion or restarted on some other node in case of failure.
*/
public class CompletionPersistentTaskAction extends Action<CompletionPersistentTaskAction.Request,
CompletionPersistentTaskAction.Response,
CompletionPersistentTaskAction.RequestBuilder> {
public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/completion";
private CompletionPersistentTaskAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeRequest<Request> {
private long taskId;
private Exception exception;
public Request() {
}
public Request(long taskId, Exception exception) {
this.taskId = taskId;
this.exception = exception;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
exception = in.readException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
out.writeException(exception);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return taskId == request.taskId &&
Objects.equals(exception, request.exception);
}
@Override
public int hashCode() {
return Objects.hash(taskId, exception);
}
}
public static class Response extends AcknowledgedResponse {
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CompletionPersistentTaskAction.Request,
CompletionPersistentTaskAction.Response, CompletionPersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, CompletionPersistentTaskAction action) {
super(client, action, new Request());
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final PersistentTaskClusterService persistentTaskClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CompletionPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
}
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTaskClusterService.completeOrRestartPersistentTask(request.taskId, request.exception, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(newResponse());
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -0,0 +1,388 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.transport.TransportResponse.Empty;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
/**
* This component is responsible for coordination of execution of persistent actions on individual nodes. It runs on all
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
*/
public class PersistentActionCoordinator extends AbstractComponent implements ClusterStateListener {
private final Map<PersistentTaskId, RunningPersistentTask> runningTasks = new HashMap<>();
private final PersistentActionService persistentActionService;
private final PersistentActionRegistry persistentActionRegistry;
private final TaskManager taskManager;
private final PersistentActionExecutor persistentActionExecutor;
public PersistentActionCoordinator(Settings settings,
PersistentActionService persistentActionService,
PersistentActionRegistry persistentActionRegistry,
TaskManager taskManager,
PersistentActionExecutor persistentActionExecutor) {
super(settings);
this.persistentActionService = persistentActionService;
this.persistentActionRegistry = persistentActionRegistry;
this.taskManager = taskManager;
this.persistentActionExecutor = persistentActionExecutor;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE);
PersistentTasksInProgress previousTasks = event.previousState().custom(PersistentTasksInProgress.TYPE);
if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {
// We have some changes let's check if they are related to our node
String localNodeId = event.state().getNodes().getLocalNodeId();
Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet());
if (tasks != null) {
for (PersistentTaskInProgress<?> taskInProgress : tasks.entries()) {
if (localNodeId.equals(taskInProgress.getExecutorNode())) {
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId);
if (persistentTask == null) {
// New task - let's start it
startTask(taskInProgress);
} else {
// The task is still running
notVisitedTasks.remove(persistentTaskId);
if (persistentTask.getState() == State.FAILED_NOTIFICATION) {
// We tried to notify the master about this task before but the notification failed and
// the master doesn't seem to know about it - retry notification
restartCompletionNotification(persistentTask);
}
}
}
}
}
for (PersistentTaskId id : notVisitedTasks) {
RunningPersistentTask task = runningTasks.get(id);
if (task.getState() == State.NOTIFIED || task.getState() == State.FAILED) {
// Result was sent to the caller and the caller acknowledged acceptance of the result
finishTask(id);
} else if (task.getState() == State.FAILED_NOTIFICATION) {
// We tried to send result to master, but it failed and master doesn't know about this task
// this shouldn't really happen, unless this node is severally out of sync with the master
logger.warn("failed to notify master about task {}", task.getId());
finishTask(id);
} else {
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
// cancel the task without notifying master
cancelTask(id);
}
}
}
}
private <Request extends PersistentActionRequest> void startTask(PersistentTaskInProgress<Request> taskInProgress) {
PersistentActionRegistry.PersistentActionHolder<Request> holder =
persistentActionRegistry.getPersistentActionHolderSafe(taskInProgress.getAction());
PersistentTask task = (PersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]",
taskInProgress.getRequest());
boolean processed = false;
try {
RunningPersistentTask runningPersistentTask = new RunningPersistentTask(task, taskInProgress.getId());
task.setStatusProvider(runningPersistentTask);
PersistentTaskListener listener = new PersistentTaskListener(runningPersistentTask);
try {
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), runningPersistentTask);
persistentActionExecutor.executeAction(taskInProgress.getRequest(), task, holder, listener);
} catch (Exception e) {
// Submit task failure
listener.onFailure(e);
}
processed = true;
} finally {
if (processed == false) {
// something went wrong - unregistering task
taskManager.unregister(task);
}
}
}
private void finishTask(PersistentTaskId persistentTaskId) {
RunningPersistentTask task = runningTasks.remove(persistentTaskId);
if (task != null && task.getTask() != null) {
taskManager.unregister(task.getTask());
}
}
private void cancelTask(PersistentTaskId persistentTaskId) {
RunningPersistentTask task = runningTasks.remove(persistentTaskId);
if (task != null && task.getTask() != null) {
if (task.markAsCancelled()) {
persistentActionService.sendCancellation(task.getTask().getId(), new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
}
@Override
public void onFailure(Exception e) {
// There is really nothing we can do in case of failure here
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getId()), e);
}
});
}
}
}
private void restartCompletionNotification(RunningPersistentTask task) {
logger.trace("resending notification for task {}", task.getId());
if (task.getState() == State.CANCELLED) {
taskManager.unregister(task.getTask());
} else {
if (task.restartCompletionNotification()) {
persistentActionService.sendCompletionNotification(task.getId(), task.getFailure(), new PublishedResponseListener(task));
} else {
logger.warn("attempt to resend notification for task {} in the {} state", task.getId(), task.getState());
}
}
}
private void startCompletionNotification(RunningPersistentTask task, Exception e) {
if (task.getState() == State.CANCELLED) {
taskManager.unregister(task.getTask());
} else {
logger.trace("sending notification for failed task {}", task.getId());
if (task.startNotification(e)) {
persistentActionService.sendCompletionNotification(task.getId(), e, new PublishedResponseListener(task));
} else {
logger.warn("attempt to send notification for task {} in the {} state", task.getId(), task.getState());
}
}
}
private class PersistentTaskListener implements ActionListener<Empty> {
private final RunningPersistentTask task;
public PersistentTaskListener(final RunningPersistentTask task) {
this.task = task;
}
@Override
public void onResponse(Empty response) {
startCompletionNotification(task, null);
}
@Override
public void onFailure(Exception e) {
if (task.getTask().isCancelled()) {
// The task was explicitly cancelled - no need to restart it, just log the exception if it's not TaskCancelledException
if (e instanceof TaskCancelledException == false) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
"cancelled task {} failed with an exception, cancellation reason [{}]",
task.getId(), task.getTask().getReasonCancelled()), e);
}
startCompletionNotification(task, null);
} else {
startCompletionNotification(task, e);
}
}
}
private class PublishedResponseListener implements ActionListener<CompletionPersistentTaskAction.Response> {
private final RunningPersistentTask task;
public PublishedResponseListener(final RunningPersistentTask task) {
this.task = task;
}
@Override
public void onResponse(CompletionPersistentTaskAction.Response response) {
logger.trace("notification for task {} was successful", task.getId());
if (task.markAsNotified() == false) {
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getId(), task.getState());
}
taskManager.unregister(task.getTask());
}
@Override
public void onFailure(Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("notification for task {} failed - retrying", task.getId()), e);
if (task.notificationFailed() == false) {
logger.warn("attempt to mark restart notification for task {} in the {} state failed", task.getId(), task.getState());
}
}
}
public enum State {
STARTED, // the task is currently running
CANCELLED, // the task is cancelled
FAILED, // the task is done running and trying to notify caller
FAILED_NOTIFICATION, // the caller notification failed
NOTIFIED // the caller was notified, the task can be removed
}
private static class PersistentTaskId {
private final long id;
private final long allocationId;
public PersistentTaskId(long id, long allocationId) {
this.id = id;
this.allocationId = allocationId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTaskId that = (PersistentTaskId) o;
return id == that.id &&
allocationId == that.allocationId;
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId);
}
}
private static class RunningPersistentTask implements Provider<Task.Status> {
private final PersistentTask task;
private final long id;
private final AtomicReference<State> state;
@Nullable
private Exception failure;
public RunningPersistentTask(PersistentTask task, long id) {
this(task, id, State.STARTED);
}
public RunningPersistentTask(PersistentTask task, long id, State state) {
this.task = task;
this.id = id;
this.state = new AtomicReference<>(state);
}
public PersistentTask getTask() {
return task;
}
public long getId() {
return id;
}
public State getState() {
return state.get();
}
public Exception getFailure() {
return failure;
}
public boolean startNotification(Exception failure) {
boolean result = state.compareAndSet(State.STARTED, State.FAILED);
if (result) {
this.failure = failure;
}
return result;
}
public boolean notificationFailed() {
return state.compareAndSet(State.FAILED, State.FAILED_NOTIFICATION);
}
public boolean restartCompletionNotification() {
return state.compareAndSet(State.FAILED_NOTIFICATION, State.FAILED);
}
public boolean markAsNotified() {
return state.compareAndSet(State.FAILED, State.NOTIFIED);
}
public boolean markAsCancelled() {
return state.compareAndSet(State.STARTED, State.CANCELLED);
}
@Override
public Task.Status get() {
return new Status(state.get());
}
}
public static class Status implements Task.Status {
public static final String NAME = "persistent_executor";
private final State state;
public Status(State state) {
this.state = requireNonNull(state, "State cannot be null");
}
public Status(StreamInput in) throws IOException {
state = State.valueOf(in.readString());
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("state", state.toString());
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(state.toString());
}
@Override
public String toString() {
return Strings.toString(this);
}
public State getState() {
return state;
}
@Override
public boolean isFragment() {
return false;
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
/**
* This component is responsible for execution of persistent actions.
*/
public class PersistentActionExecutor {
private final ThreadPool threadPool;
public PersistentActionExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}
public <Request extends PersistentActionRequest> void executeAction(Request request,
PersistentTask task,
PersistentActionRegistry.PersistentActionHolder<Request> holder,
ActionListener<Empty> listener) {
threadPool.executor(holder.getExecutor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@SuppressWarnings("unchecked")
@Override
protected void doRun() throws Exception {
try {
holder.getPersistentAction().nodeOperation(task, request, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
}
});
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.util.Collections;
import java.util.Map;
/**
* Components that registers all persistent actions
*/
public class PersistentActionRegistry extends AbstractComponent {
private volatile Map<String, PersistentActionHolder<?>> actions = Collections.emptyMap();
private final Object actionHandlerMutex = new Object();
public PersistentActionRegistry(Settings settings) {
super(settings);
}
public <Request extends PersistentActionRequest> void registerPersistentAction(String action,
TransportPersistentAction<Request> persistentAction) {
registerPersistentAction(new PersistentActionHolder<>(action, persistentAction, persistentAction.getExecutor()));
}
private <Request extends PersistentActionRequest> void registerPersistentAction(
PersistentActionHolder<Request> reg) {
synchronized (actionHandlerMutex) {
PersistentActionHolder<?> replaced = actions.get(reg.getAction());
actions = MapBuilder.newMapBuilder(actions).put(reg.getAction(), reg).immutableMap();
if (replaced != null) {
logger.warn("registered two handlers for persistent action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
}
}
}
public void removeHandler(String action) {
synchronized (actionHandlerMutex) {
actions = MapBuilder.newMapBuilder(actions).remove(action).immutableMap();
}
}
@SuppressWarnings("unchecked")
public <Request extends PersistentActionRequest> PersistentActionHolder<Request> getPersistentActionHolderSafe(String action) {
PersistentActionHolder<Request> holder = (PersistentActionHolder<Request>) actions.get(action);
if (holder == null) {
throw new IllegalStateException("Unknown persistent action [" + action + "]");
}
return holder;
}
public <Request extends PersistentActionRequest>
TransportPersistentAction<Request> getPersistentActionSafe(String action) {
PersistentActionHolder<Request> holder = getPersistentActionHolderSafe(action);
return holder.getPersistentAction();
}
public static final class PersistentActionHolder<Request extends PersistentActionRequest> {
private final String action;
private final TransportPersistentAction<Request> persistentAction;
private final String executor;
public PersistentActionHolder(String action, TransportPersistentAction<Request> persistentAction, String executor) {
this.action = action;
this.persistentAction = persistentAction;
this.executor = executor;
}
public String getAction() {
return action;
}
public TransportPersistentAction<Request> getPersistentAction() {
return persistentAction;
}
public String getExecutor() {
return executor;
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
/**
* Base class for a request for a persistent action
*/
public abstract class PersistentActionRequest extends ActionRequest implements NamedWriteable, ToXContent {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new PersistentTask(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
/**
* Response upon a successful start or an persistent action
*/
public class PersistentActionResponse extends ActionResponse {
private long taskId;
public PersistentActionResponse() {
super();
}
public PersistentActionResponse(long taskId) {
this.taskId = taskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
}
public long getTaskId() {
return taskId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentActionResponse that = (PersistentActionResponse) o;
return taskId == that.taskId;
}
@Override
public int hashCode() {
return Objects.hash(taskId);
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.TaskId;
/**
* Service responsible for executing restartable actions that can survive disappearance of a coordinating and executor nodes.
*/
public class PersistentActionService extends AbstractComponent {
private final Client client;
private final ClusterService clusterService;
public PersistentActionService(Settings settings, ClusterService clusterService, Client client) {
super(settings);
this.client = client;
this.clusterService = clusterService;
}
public <Request extends PersistentActionRequest> void sendRequest(String action, Request request,
ActionListener<PersistentActionResponse> listener) {
StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(action, request);
try {
client.execute(StartPersistentTaskAction.INSTANCE, startRequest, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
public void sendCompletionNotification(long taskId, Exception failure,
ActionListener<CompletionPersistentTaskAction.Response> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
try {
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
public void sendCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
DiscoveryNode localNode = clusterService.localNode();
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
cancelTasksRequest.setReason("persistent action was removed");
try {
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
/**
* Task that returns additional state information
*/
public class PersistentTask extends CancellableTask {
private Provider<Status> statusProvider;
public PersistentTask(long id, String type, String action, String description, TaskId parentTask) {
super(id, type, action, description, parentTask);
}
@Override
public boolean shouldCancelChildrenOnCancellation() {
return true;
}
@Override
public Status getStatus() {
Provider<Status> statusProvider = this.statusProvider;
if (statusProvider != null) {
return statusProvider.get();
} else {
return null;
}
}
public void setStatusProvider(Provider<Status> statusProvider) {
assert this.statusProvider == null;
this.statusProvider = statusProvider;
}
}

View File

@ -0,0 +1,231 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.transport.TransportResponse.Empty;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Component that runs only on the master node and is responsible for assigning running tasks to nodes
*/
public class PersistentTaskClusterService extends AbstractComponent implements ClusterStateListener {
private final ClusterService clusterService;
private final PersistentActionRegistry registry;
public PersistentTaskClusterService(Settings settings, PersistentActionRegistry registry, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
clusterService.addListener(this);
this.registry = registry;
}
/**
* Creates a new persistent task on master node
*
* @param action the action name
* @param request request
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentActionRequest> void createPersistentTask(String action, Request request,
ActionListener<Long> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final String executorNodeId = executorNode(action, currentState, request);
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
final long nextId;
if (tasksInProgress != null) {
nextId = tasksInProgress.getCurrentId() + 1;
currentTasks.addAll(tasksInProgress.entries());
} else {
nextId = 1;
}
currentTasks.add(new PersistentTaskInProgress<>(nextId, action, request, executorNodeId));
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(nextId, currentTasks)).build();
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(((PersistentTasksInProgress) newState.custom(PersistentTasksInProgress.TYPE)).getCurrentId());
}
});
}
/**
* Restarts a record about a running persistent task from cluster state
*
* @param id the id of a persistent task
* @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed
*/
public void completeOrRestartPersistentTask(long id, Exception failure, ActionListener<Empty> listener) {
final String source;
if (failure != null) {
logger.warn("persistent task " + id + " failed, restarting", failure);
source = "restart persistent task";
} else {
source = "finish persistent task";
}
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE);
if (tasksInProgress == null) {
// Nothing to do, the task was already deleted
return currentState;
}
boolean found = false;
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (taskInProgress.getId() == id) {
assert found == false;
found = true;
if (failure != null) {
// If the task failed - we need to restart it on another node, otherwise we just remove it
String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest());
currentTasks.add(new PersistentTaskInProgress<>(taskInProgress, executorNode));
}
} else {
currentTasks.add(taskInProgress);
}
}
if (found) {
ClusterState.Builder builder = ClusterState.builder(currentState);
PersistentTasksInProgress tasks = new PersistentTasksInProgress(tasksInProgress.getCurrentId(), currentTasks);
return builder.putCustom(PersistentTasksInProgress.TYPE, tasks).build();
} else {
return currentState;
}
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(Empty.INSTANCE);
}
});
}
private <Request extends PersistentActionRequest> String executorNode(String action, ClusterState currentState, Request request) {
TransportPersistentAction<Request> persistentAction = registry.getPersistentActionSafe(action);
persistentAction.validate(request, currentState);
DiscoveryNode executorNode = persistentAction.executorNode(request, currentState);
final String executorNodeId;
if (executorNode == null) {
// The executor node not available yet, we will create task with empty executor node and try
// again later
executorNodeId = null;
} else {
executorNodeId = executorNode.getId();
}
return executorNodeId;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) {
PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE);
if (tasks != null && (event.nodesChanged() || event.previousState().nodes().isLocalNodeElectedMaster() == false)) {
// We need to check if removed nodes were running any of the tasks and reassign them
boolean reassignmentRequired = false;
Set<String> removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
for (PersistentTaskInProgress<?> taskInProgress : tasks.entries()) {
if (taskInProgress.getExecutorNode() == null) {
// there is an unassigned task - we need to try assigning it
reassignmentRequired = true;
break;
}
if (removedNodes.contains(taskInProgress.getExecutorNode())) {
// The caller node disappeared, we need to assign a new caller node
reassignmentRequired = true;
break;
}
}
if (reassignmentRequired) {
reassignTasks();
}
}
}
}
/**
* Evaluates the cluster state and tries to assign tasks to nodes
*/
public void reassignTasks() {
clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksInProgress tasks = currentState.custom(PersistentTasksInProgress.TYPE);
ClusterState newClusterState = currentState;
DiscoveryNodes nodes = currentState.nodes();
if (tasks != null) {
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTaskInProgress<?> task : tasks.entries()) {
if (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false) {
// there is an unassigned task - we need to try assigning it
String executorNode = executorNode(task.getAction(), currentState, task.getRequest());
if (Objects.equals(executorNode, task.getExecutorNode()) == false) {
PersistentTasksInProgress tasksInProgress = newClusterState.custom(PersistentTasksInProgress.TYPE);
final List<PersistentTaskInProgress<?>> currentTasks = new ArrayList<>();
for (PersistentTaskInProgress<?> taskInProgress : tasksInProgress.entries()) {
if (task.getId() == taskInProgress.getId()) {
currentTasks.add(new PersistentTaskInProgress<>(task, executorNode));
} else {
currentTasks.add(taskInProgress);
}
}
newClusterState = ClusterState.builder(newClusterState).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(tasksInProgress.getCurrentId(), currentTasks)).build();
}
}
}
}
return newClusterState;
}
@Override
public void onFailure(String source, Exception e) {
logger.warn("Unsuccessful persistent task reassignment", e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
});
}
}

View File

@ -0,0 +1,202 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
/**
* A cluster state record that contains a list of all running persistent tasks
*/
public final class PersistentTasksInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public static final String TYPE = "persistent_tasks";
// TODO: Implement custom Diff for entries
private final List<PersistentTaskInProgress<?>> entries;
private final long currentId;
public PersistentTasksInProgress(long currentId, List<PersistentTaskInProgress<?>> entries) {
this.currentId = currentId;
this.entries = entries;
}
public List<PersistentTaskInProgress<?>> entries() {
return this.entries;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTasksInProgress that = (PersistentTasksInProgress) o;
return currentId == that.currentId &&
Objects.equals(entries, that.entries);
}
@Override
public int hashCode() {
return Objects.hash(entries, currentId);
}
public long getNumberOfTasksOnNode(String nodeId, String action) {
return entries.stream().filter(task -> action.equals(task.action) && nodeId.equals(task.executorNode)).count();
}
@Override
public Version getMinimalSupportedVersion() {
return Version.V_5_3_0_UNRELEASED;
}
/**
* A record that represents a single running persistent task
*/
public static class PersistentTaskInProgress<Request extends PersistentActionRequest> implements Writeable {
private final long id;
private final long allocationId;
private final String action;
private final Request request;
@Nullable
private final String executorNode;
public PersistentTaskInProgress(long id, String action, Request request, String executorNode) {
this(id, 0L, action, request, executorNode);
}
public PersistentTaskInProgress(PersistentTaskInProgress<Request> persistentTaskInProgress, String newExecutorNode) {
this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId + 1L,
persistentTaskInProgress.action, persistentTaskInProgress.request, newExecutorNode);
}
private PersistentTaskInProgress(long id, long allocationId, String action, Request request, String executorNode) {
this.id = id;
this.allocationId = allocationId;
this.action = action;
this.request = request;
this.executorNode = executorNode;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
}
@SuppressWarnings("unchecked")
private PersistentTaskInProgress(StreamInput in) throws IOException {
id = in.readLong();
allocationId = in.readLong();
action = in.readString();
request = (Request) in.readNamedWriteable(PersistentActionRequest.class);
executorNode = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
out.writeLong(allocationId);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeOptionalString(executorNode);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTaskInProgress<?> that = (PersistentTaskInProgress<?>) o;
return id == that.id &&
allocationId == that.allocationId &&
Objects.equals(action, that.action) &&
Objects.equals(request, that.request) &&
Objects.equals(executorNode, that.executorNode);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, action, request, executorNode);
}
public long getId() {
return id;
}
public long getAllocationId() {
return allocationId;
}
public String getAction() {
return action;
}
public Request getRequest() {
return request;
}
@Nullable
public String getExecutorNode() {
return executorNode;
}
}
@Override
public String getWriteableName() {
return TYPE;
}
public PersistentTasksInProgress(StreamInput in) throws IOException {
currentId = in.readLong();
entries = in.readList(PersistentTaskInProgress::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(currentId);
out.writeList(entries);
}
public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(ClusterState.Custom.class, TYPE, in);
}
public long getCurrentId() {
return currentId;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("current_id", currentId);
builder.startArray("running_tasks");
for (PersistentTaskInProgress<?> entry : entries) {
toXContent(entry, builder, params);
}
builder.endArray();
return builder;
}
public void toXContent(PersistentTaskInProgress<?> entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
{
builder.field("uuid", entry.id);
builder.field("action", entry.action);
builder.field("request");
entry.request.toXContent(builder, params);
builder.field("executor_node", entry.executorNode);
}
builder.endObject();
}
}

View File

@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
public class RemovePersistentTaskAction extends Action<RemovePersistentTaskAction.Request,
RemovePersistentTaskAction.Response,
RemovePersistentTaskAction.RequestBuilder> {
public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/remove";
private RemovePersistentTaskAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends MasterNodeRequest<Request> {
private long taskId;
public Request() {
}
public Request(long taskId) {
this.taskId = taskId;
}
public void setTaskId(long taskId) {
this.taskId = taskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return taskId == request.taskId;
}
@Override
public int hashCode() {
return Objects.hash(taskId);
}
}
public static class Response extends AcknowledgedResponse {
protected Response() {
super();
}
public Response(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}
@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<RemovePersistentTaskAction.Request,
RemovePersistentTaskAction.Response, RemovePersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, RemovePersistentTaskAction action) {
super(client, action, new Request());
}
public final RequestBuilder setTaskId(long taskId) {
request.setTaskId(taskId);
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final PersistentTaskClusterService persistentTaskClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, RemovePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
}
@Override
protected String executor() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTaskClusterService.completeOrRestartPersistentTask(request.taskId, null, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
/**
* Internal action used by TransportPersistentAction to add the record for the persistent action to the cluster state.
*/
public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.Request,
PersistentActionResponse,
StartPersistentTaskAction.RequestBuilder> {
public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/start";
private StartPersistentTaskAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public PersistentActionResponse newResponse() {
return new PersistentActionResponse();
}
public static class Request extends MasterNodeRequest<Request> {
private String action;
private PersistentActionRequest request;
public Request() {
}
public Request(String action, PersistentActionRequest request) {
this.action = action;
this.request = request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
action = in.readString();
request = in.readOptionalNamedWriteable(PersistentActionRequest.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(action);
out.writeOptionalNamedWriteable(request);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request);
}
@Override
public int hashCode() {
return Objects.hash(action, request);
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<StartPersistentTaskAction.Request,
PersistentActionResponse, StartPersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) {
super(client, action, new Request());
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentActionResponse> {
private final PersistentTaskClusterService persistentTaskClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
PersistentActionRegistry persistentActionRegistry,
PersistentActionService persistentActionService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
PersistentActionExecutor executor = new PersistentActionExecutor(threadPool);
clusterService.addListener(new PersistentActionCoordinator(settings, persistentActionService, persistentActionRegistry,
transportService.getTaskManager(), executor));
}
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Override
protected PersistentActionResponse newResponse() {
return new PersistentActionResponse();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentActionResponse> listener) {
persistentTaskClusterService.createPersistentTask(request.action, request.request, new ActionListener<Long>() {
@Override
public void onResponse(Long newTaskId) {
listener.onResponse(new PersistentActionResponse(newTaskId));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* An action that can survive restart of requesting or executing node.
* These actions are using cluster state rather than only transport service to send requests and responses.
*/
public abstract class TransportPersistentAction<Request extends PersistentActionRequest>
extends HandledTransportAction<Request, PersistentActionResponse> {
private final String executor;
private final PersistentActionService persistentActionService;
protected TransportPersistentAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, PersistentActionService persistentActionService,
PersistentActionRegistry persistentActionRegistry,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> requestSupplier, String executor) {
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver,
requestSupplier);
this.executor = executor;
this.persistentActionService = persistentActionService;
persistentActionRegistry.registerPersistentAction(actionName, this);
}
/**
* Returns the node id where the request has to be executed,
* <p>
* The default implementation returns the least loaded data node
*/
public DiscoveryNode executorNode(Request request, ClusterState clusterState) {
return selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode);
}
/**
* Finds the least loaded node that satisfies the selector criteria
*/
protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predicate<DiscoveryNode> selector) {
long minLoad = Long.MAX_VALUE;
DiscoveryNode minLoadedNode = null;
PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE);
for (DiscoveryNode node : clusterState.getNodes()) {
if (selector.test(node)) {
if (persistentTasksInProgress == null) {
// We don't have any task running yet, pick the first available node
return node;
}
long numberOfTasks = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), actionName);
if (minLoad > numberOfTasks) {
minLoad = numberOfTasks;
minLoadedNode = node;
}
}
}
return minLoadedNode;
}
/**
* Checks the current cluster state for compatibility with the request
* <p>
* Throws an exception if the supplied request cannot be executed on the cluster in the current state.
*/
public void validate(Request request, ClusterState clusterState) {
}
@Override
protected void doExecute(Request request, ActionListener<PersistentActionResponse> listener) {
persistentActionService.sendRequest(actionName, request, listener);
}
/**
* This operation will be executed on the executor node.
*
* If nodeOperation throws an exception or triggers listener.onFailure() method, the task will be restarted,
* possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully
* completed and will be removed from the cluster state and not restarted.
*/
protected abstract void nodeOperation(PersistentTask task, Request request, ActionListener<Empty> listener);
public String getExecutor() {
return executor;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
/**
* The Persistent Actions are actions responsible for executing restartable actions that can survive disappearance of a
* coordinating and executor nodes.
* <p>
* In order to be resilient to node restarts, the persistent actions are using the cluster state instead of a transport service to send
* requests and responses. The execution is done in six phases:
* <p>
* 1. The coordinating node sends an ordinary transport request to the master node to start a new persistent action. This action is handled
* by the {@link org.elasticsearch.xpack.persistent.PersistentActionService}, which is using
* {@link org.elasticsearch.xpack.persistent.PersistentTaskClusterService} to update cluster state with the record about running persistent
* task.
* <p>
* 2. The master node updates the {@link org.elasticsearch.xpack.persistent.PersistentTasksInProgress} in the cluster state to indicate that
* there is a new persistent action
* running in the system.
* <p>
* 3. The {@link org.elasticsearch.xpack.persistent.PersistentActionCoordinator} running on every node in the cluster monitors changes in
* the cluster state and starts execution of all new actions assigned to the node it is running on.
* <p>
* 4. If the action fails to start on the node, the {@link org.elasticsearch.xpack.persistent.PersistentActionCoordinator} uses the
* {@link org.elasticsearch.xpack.persistent.PersistentTasksInProgress} to notify the
* {@link org.elasticsearch.xpack.persistent.PersistentActionService}, which reassigns the action to another node in the cluster.
* <p>
* 5. If action finishes successfully on the node and calls listener.onResponse(), the corresponding persistent action is removed from the
* cluster state.
* <p>
* 6. The {@link org.elasticsearch.xpack.persistent.RemovePersistentTaskAction} action can be also used to remove the persistent action.
*/
package org.elasticsearch.xpack.persistent;

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Request;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class CancelPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomLong());
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Response;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class CancelPersistentTaskResponseTests extends AbstractStreamableTestCase<Response> {
@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}
@Override
protected Response createBlankInstance() {
return new Response();
}
}

View File

@ -0,0 +1,367 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Response;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PersistentActionCoordinatorTests extends ESTestCase {
private ClusterService createClusterService() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
return new ClusterService(Settings.builder().put("cluster.name", "PersistentActionExecutorTests").build(),
clusterSettings, null, () -> new DiscoveryNode(UUIDs.randomBase64UUID(), buildNewFakeTransportAddress(),
Version.CURRENT));
}
private DiscoveryNodes createTestNodes(int nonLocalNodesCount, Settings settings) {
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
nodes.add(DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), "this_node"));
for (int i = 0; i < nonLocalNodesCount; i++) {
nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT));
}
nodes.localNodeId("this_node");
return nodes.build();
}
public void testStartTask() throws Exception {
ClusterService clusterService = createClusterService();
PersistentActionService persistentActionService = mock(PersistentActionService.class);
PersistentActionRegistry registry = new PersistentActionRegistry(Settings.EMPTY);
@SuppressWarnings("unchecked") TransportPersistentAction<TestRequest> action = mock(TransportPersistentAction.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
registry.registerPersistentAction("test", action);
int nonLocalNodesCount = randomInt(10);
MockExecutor executor = new MockExecutor();
PersistentActionCoordinator coordinator = new PersistentActionCoordinator(Settings.EMPTY, persistentActionService,
registry, new TaskManager(Settings.EMPTY), executor);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
.build();
List<PersistentTaskInProgress<?>> tasks = new ArrayList<>();
long taskId = randomLong();
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.add(new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i),
"other_node_" + randomInt(nonLocalNodesCount)));
taskId++;
if (added == false && randomBoolean()) {
added = true;
tasks.add(new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node"));
taskId++;
}
}
}
if (added == false) {
logger.info("No local node action was added");
}
ClusterState newClusterState = ClusterState.builder(state)
.putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(taskId, tasks)).build();
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
if (added) {
// Action for this node was added, let's make sure it was invoked
assertThat(executor.executions.size(), equalTo(1));
// Add task on some other node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "some_other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action wasn't called again
assertThat(executor.executions.size(), equalTo(1));
// Start another task on this node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest("this_param"), "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action was called this time
assertThat(executor.size(), equalTo(2));
// Finish both tasks
executor.get(0).listener.onFailure(new RuntimeException());
executor.get(1).listener.onResponse(Empty.INSTANCE);
long failedTaskId = executor.get(0).task.getParentTaskId().getId();
long finishedTaskId = executor.get(1).task.getParentTaskId().getId();
executor.clear();
// Add task on some other node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "some_other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action wasn't called again
assertThat(executor.size(), equalTo(0));
// Simulate reallocation of the failed task on the same node
state = newClusterState;
newClusterState = reallocateTask(state, failedTaskId, "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Simulate removal of the finished task
state = newClusterState;
newClusterState = removeTask(state, finishedTaskId);
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action was only allocated on this node once
assertThat(executor.size(), equalTo(1));
}
}
public void testTaskCancellation() {
ClusterService clusterService = createClusterService();
AtomicLong capturedTaskId = new AtomicLong();
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, null, null) {
@Override
public void sendCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
capturedTaskId.set(taskId);
capturedListener.set(listener);
}
};
PersistentActionRegistry registry = new PersistentActionRegistry(Settings.EMPTY);
@SuppressWarnings("unchecked") TransportPersistentAction<TestRequest> action = mock(TransportPersistentAction.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
registry.registerPersistentAction("test", action);
int nonLocalNodesCount = randomInt(10);
MockExecutor executor = new MockExecutor();
TaskManager taskManager = new TaskManager(Settings.EMPTY);
PersistentActionCoordinator coordinator = new PersistentActionCoordinator(Settings.EMPTY, persistentActionService,
registry, taskManager, executor);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
.build();
ClusterState newClusterState = state;
// Allocate first task
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Check the the task is know to the task manager
assertThat(taskManager.getTasks().size(), equalTo(1));
Task runningTask = taskManager.getTasks().values().iterator().next();
long persistentId = runningTask.getParentTaskId().getId();
long localId = runningTask.getId();
// Make sure it returns correct status
Task.Status status = runningTask.getStatus();
assertThat(status.toString(), equalTo("{\"state\":\"STARTED\"}"));
state = newClusterState;
// Relocate the task to some other node or remove it completely
if (randomBoolean()) {
newClusterState = reallocateTask(state, persistentId, "some_other_node");
} else {
newClusterState = removeTask(state, persistentId);
}
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure it returns correct status
assertThat(taskManager.getTasks().size(), equalTo(1));
assertThat(taskManager.getTasks().values().iterator().next().getStatus().toString(), equalTo("{\"state\":\"CANCELLED\"}"));
// That should trigger cancellation request
assertThat(capturedTaskId.get(), equalTo(localId));
// Notify successful cancellation
capturedListener.get().onResponse(new CancelTasksResponse());
// finish or fail task
if (randomBoolean()) {
executor.get(0).listener.onResponse(Empty.INSTANCE);
} else {
executor.get(0).listener.onFailure(new IOException("test"));
}
// Check the the task is now removed from task manager
assertThat(taskManager.getTasks().values(), empty());
}
public void testNotificationFailure() {
ClusterService clusterService = createClusterService();
AtomicLong capturedTaskId = new AtomicLong(-1L);
AtomicReference<Exception> capturedException = new AtomicReference<>();
AtomicReference<ActionListener<Response>> capturedListener = new AtomicReference<>();
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, clusterService, null) {
@Override
public void sendCompletionNotification(long taskId, Exception failure, ActionListener<Response> listener) {
capturedTaskId.set(taskId);
capturedException.set(failure);
capturedListener.set(listener);
}
};
PersistentActionRegistry registry = new PersistentActionRegistry(Settings.EMPTY);
@SuppressWarnings("unchecked") TransportPersistentAction<TestRequest> action = mock(TransportPersistentAction.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
registry.registerPersistentAction("test", action);
int nonLocalNodesCount = randomInt(10);
MockExecutor executor = new MockExecutor();
TaskManager taskManager = new TaskManager(Settings.EMPTY);
PersistentActionCoordinator coordinator = new PersistentActionCoordinator(Settings.EMPTY, persistentActionService,
registry, taskManager, executor);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
.build();
ClusterState newClusterState = state;
// Allocate first task
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Fail the task
executor.get(0).listener.onFailure(new RuntimeException("test failure"));
// Check that notification was sent
assertThat(capturedException.get().getMessage(), equalTo("test failure"));
capturedException.set(null);
// Simulate failure to notify
capturedListener.get().onFailure(new IOException("simulated notification failure"));
// Allocate another task
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Check that notification was sent again
assertThat(capturedException.get().getMessage(), equalTo("test failure"));
// Check the the task is still known by the task manager
assertThat(taskManager.getTasks().size(), equalTo(1));
long id = taskManager.getTasks().values().iterator().next().getParentTaskId().getId();
// This time acknowledge notification
capturedListener.get().onResponse(new Response());
// Reallocate failed task to another node
state = newClusterState;
newClusterState = reallocateTask(state, id, "other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Check the the task is now removed from task manager
assertThat(taskManager.getTasks().values(), empty());
}
private <Request extends PersistentActionRequest> ClusterState addTask(ClusterState state, String action, Request request,
String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries());
tasks.add(new PersistentTaskInProgress<>(prevTasks == null ? 0 : prevTasks.getCurrentId(), action, request, node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries());
for (int i = 0; i < tasks.size(); i++) {
if (tasks.get(i).getId() == taskId) {
tasks.set(i, new PersistentTaskInProgress<>(tasks.get(i), node));
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
}
}
fail("didn't find task with id " + taskId);
return null;
}
private ClusterState removeTask(ClusterState state, long taskId) {
PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE);
List<PersistentTaskInProgress<?>> tasks = prevTasks == null ? new ArrayList<>() : new ArrayList<>(prevTasks.entries());
for (int i = 0; i < tasks.size(); i++) {
if (tasks.get(i).getId() == taskId) {
tasks.remove(i);
return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE,
new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build();
}
}
fail("didn't find task with id " + taskId);
return null;
}
private class Execution {
private final PersistentActionRequest request;
private final PersistentTask task;
private final PersistentActionRegistry.PersistentActionHolder<?> holder;
private final ActionListener<Empty> listener;
public Execution(PersistentActionRequest request, PersistentTask task, PersistentActionRegistry.PersistentActionHolder<?> holder,
ActionListener<Empty> listener) {
this.request = request;
this.task = task;
this.holder = holder;
this.listener = listener;
}
}
private class MockExecutor extends PersistentActionExecutor {
private List<Execution> executions = new ArrayList<>();
public MockExecutor() {
super(null);
}
@Override
public <Request extends PersistentActionRequest> void executeAction(Request request, PersistentTask task,
PersistentActionRegistry.PersistentActionHolder<Request> holder,
ActionListener<Empty> listener) {
executions.add(new Execution(request, task, holder, listener));
}
public Execution get(int i) {
return executions.get(i);
}
public int size() {
return executions.size();
}
public void clear() {
executions.clear();
}
}
}

View File

@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestTasksRequestBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class PersistentActionIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(TestPersistentActionPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return super.getMockPlugins();
}
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.build();
}
public void testPersistentActionRestart() throws Exception {
long taskId = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(),
equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
.get().getTasks().get(0);
logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId());
// Verifying parent
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
logger.info("Failing the running task");
// Fail the running task and make sure it restarts properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("fail").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
assertBusy(() -> {
// Wait for the task to restart
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get()
.getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(1));
// Make sure that restarted task is different
assertThat(tasks.get(0).getTaskId(), not(equalTo(firstRunningTask.getTaskId())));
});
logger.info("Removing persistent task with id {}", firstRunningTask.getId());
// Remove the persistent task
assertAcked(RemovePersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get());
logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId());
assertBusy(() -> {
// Wait for the task to disappear completely
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks(),
empty());
});
}
public void testPersistentActionCompletion() throws Exception {
long taskId = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(),
equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
.get().getTasks().get(0);
logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId());
// Verifying parent
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
if (randomBoolean()) {
logger.info("Completing the running task");
// Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
} else {
logger.info("Cancelling the running task");
// Cancel the running task and make sure it finishes properly
assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
}
assertBusy(() -> {
// Wait for the task to finish
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get()
.getTasks();
logger.info("Found {} tasks", tasks.size());
assertThat(tasks.size(), equalTo(0));
// Make sure the task is removed from the cluster state
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE))
.entries(), empty());
});
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
long taskId = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah")
.executorNodeAttr("test").get().getTaskId();
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
String newNode = internalCluster().startNode(nodeSettings);
String newNodeId = internalCluster().clusterService(newNode).localNode().getId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(),
equalTo(1));
});
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
.get().getTasks().get(0);
// Verifying the the task runs on the new node
assertThat(taskInfo.getTaskId().getNodeId(), equalTo(newNodeId));
internalCluster().stopRandomNode(settings -> "test".equals(settings.get("node.attr.test_attr")));
assertBusy(() -> {
// Wait for the task to disappear completely
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks(),
empty());
});
// Remove the persistent task
assertAcked(RemovePersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get());
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class PersistentActionRegistryTests extends ESTestCase {
public void testActionLookup() {
PersistentActionRegistry registry = new PersistentActionRegistry(Settings.EMPTY);
TransportPersistentAction<?> action1 = mock(TransportPersistentAction.class);
when(action1.getExecutor()).thenReturn(ThreadPool.Names.MANAGEMENT);
TransportPersistentAction<?> action2 = mock(TransportPersistentAction.class);
when(action2.getExecutor()).thenReturn(ThreadPool.Names.GENERIC);
registry.registerPersistentAction("test1", action1);
registry.registerPersistentAction("test2", action2);
assertEquals(registry.getPersistentActionHolderSafe("test1").getAction(), "test1");
assertEquals(registry.getPersistentActionHolderSafe("test1").getExecutor(), ThreadPool.Names.MANAGEMENT);
assertEquals(registry.getPersistentActionHolderSafe("test1").getPersistentAction(), action1);
assertEquals(registry.getPersistentActionSafe("test1"), action1);
assertEquals(registry.getPersistentActionHolderSafe("test2").getAction(), "test2");
assertEquals(registry.getPersistentActionHolderSafe("test2").getExecutor(), ThreadPool.Names.GENERIC);
assertEquals(registry.getPersistentActionHolderSafe("test2").getPersistentAction(), action2);
assertEquals(registry.getPersistentActionSafe("test2"), action2);
try {
registry.getPersistentActionHolderSafe("test3");
fail("Should have failed");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Unknown persistent action [test3]");
}
try {
registry.getPersistentActionSafe("test3");
fail("Should have failed");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Unknown persistent action [test3]");
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class PersistentActionResponseTests extends AbstractStreamableTestCase<PersistentActionResponse> {
@Override
protected PersistentActionResponse createTestInstance() {
return new PersistentActionResponse(randomLong());
}
@Override
protected PersistentActionResponse createBlankInstance() {
return new PersistentActionResponse();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase<PersistentTasksInProgress> {
@Override
protected PersistentTasksInProgress createTestInstance() {
int numberOfTasks = randomInt(10);
List<PersistentTasksInProgress.PersistentTaskInProgress<?>> entries = new ArrayList<>();
for (int i = 0; i < numberOfTasks; i++) {
entries.add(new PersistentTasksInProgress.PersistentTaskInProgress<>(
randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)),
randomAsciiOfLength(10)));
}
return new PersistentTasksInProgress(randomLong(), entries);
}
@Override
protected Writeable.Reader<PersistentTasksInProgress> instanceReader() {
return PersistentTasksInProgress::new;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new)
));
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Request;
import org.elasticsearch.test.AbstractStreamableTestCase;
public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomLong(), null);
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction.Request;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction;
import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest;
import org.elasticsearch.test.AbstractStreamableTestCase;
import java.util.Collections;
public class StartPersistentActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
TestRequest testRequest = new TestRequest();
if (randomBoolean()) {
testRequest.setTestParam(randomAsciiOfLengthBetween(1, 20));
}
if (randomBoolean()) {
testRequest.setParentTask(randomAsciiOfLengthBetween(1, 20), randomLong());
}
if (randomBoolean()) {
testRequest.setExecutorNodeAttr(randomAsciiOfLengthBetween(1, 20));
}
return new Request(randomAsciiOfLengthBetween(1, 20), new TestRequest());
}
@Override
protected Request createBlankInstance() {
return new Request();
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new)
));
}
}

View File

@ -0,0 +1,452 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TasksRequestBuilder;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* A plugin that adds a test persistent task.
*/
public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(TestPersistentAction.INSTANCE, TransportTestPersistentAction.class),
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
);
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, clusterService, client);
PersistentActionRegistry persistentActionRegistry = new PersistentActionRegistry(Settings.EMPTY);
return Arrays.asList(
persistentActionService,
persistentActionRegistry,
new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService)
);
}
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new),
new NamedWriteableRegistry.Entry(PersistentActionCoordinator.Status.class,
PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new),
new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom)
);
}
public static class TestRequest extends PersistentActionRequest {
private String executorNodeAttr = null;
private String responseNode = null;
private String testParam = null;
public TestRequest() {
}
public TestRequest(String testParam) {
this.testParam = testParam;
}
public TestRequest(StreamInput in) throws IOException {
readFrom(in);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public String getWriteableName() {
return TestPersistentAction.NAME;
}
public void setExecutorNodeAttr(String executorNodeAttr) {
this.executorNodeAttr = executorNodeAttr;
}
public void setTestParam(String testParam) {
this.testParam = testParam;
}
public String getExecutorNodeAttr() {
return executorNodeAttr;
}
public String getTestParam() {
return testParam;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(executorNodeAttr);
out.writeOptionalString(responseNode);
out.writeOptionalString(testParam);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
executorNodeAttr = in.readOptionalString();
responseNode = in.readOptionalString();
testParam = in.readOptionalString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestRequest that = (TestRequest) o;
return Objects.equals(executorNodeAttr, that.executorNodeAttr) &&
Objects.equals(responseNode, that.responseNode) &&
Objects.equals(testParam, that.testParam);
}
@Override
public int hashCode() {
return Objects.hash(executorNodeAttr, responseNode, testParam);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new TestTask(id, type, action, getDescription(), parentTaskId);
}
}
public static class TestPersistentTaskRequestBuilder extends
ActionRequestBuilder<TestRequest, PersistentActionResponse, TestPersistentTaskRequestBuilder> {
protected TestPersistentTaskRequestBuilder(ElasticsearchClient client, Action<TestRequest, PersistentActionResponse,
TestPersistentTaskRequestBuilder> action, TestRequest request) {
super(client, action, request);
}
public TestPersistentTaskRequestBuilder testParam(String testParam) {
request.setTestParam(testParam);
return this;
}
public TestPersistentTaskRequestBuilder executorNodeAttr(String targetNode) {
request.setExecutorNodeAttr(targetNode);
return this;
}
}
public static class TestPersistentAction extends Action<TestRequest, PersistentActionResponse, TestPersistentTaskRequestBuilder> {
public static final TestPersistentAction INSTANCE = new TestPersistentAction();
public static final String NAME = "cluster:admin/persistent/test";
private TestPersistentAction() {
super(NAME);
}
@Override
public PersistentActionResponse newResponse() {
return new PersistentActionResponse();
}
@Override
public TestPersistentTaskRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new TestPersistentTaskRequestBuilder(client, this, new TestRequest());
}
}
public static class TransportTestPersistentAction extends TransportPersistentAction<TestRequest> {
private final TransportService transportService;
@Inject
public TransportTestPersistentAction(Settings settings, ThreadPool threadPool, TransportService transportService,
PersistentActionService persistentActionService,
PersistentActionRegistry persistentActionRegistry, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, TestPersistentAction.NAME, false, threadPool, transportService, persistentActionService,
persistentActionRegistry, actionFilters, indexNameExpressionResolver, TestRequest::new,
ThreadPool.Names.MANAGEMENT);
this.transportService = transportService;
}
@Override
public DiscoveryNode executorNode(TestRequest request, ClusterState clusterState) {
if (request.getExecutorNodeAttr() == null) {
return super.executorNode(request, clusterState);
} else {
return selectLeastLoadedNode(clusterState,
discoveryNode -> request.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr")));
}
}
@Override
protected void nodeOperation(PersistentTask task, TestRequest request, ActionListener<Empty> listener) {
logger.info("started node operation for the task {}", task);
try {
TestTask testTask = (TestTask) task;
assertTrue(awaitBusy(() -> testTask.isCancelled() ||
testTask.getOperation() != null ||
transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes
if (transportService.lifecycleState() == Lifecycle.State.STARTED) {
if ("finish".equals(testTask.getOperation())) {
listener.onResponse(Empty.INSTANCE);
} else if ("fail".equals(testTask.getOperation())) {
listener.onFailure(new RuntimeException("Simulating failure"));
} else if (testTask.isCancelled()) {
// Cancellation make cause different ways for the task to finish
if (randomBoolean()) {
if (randomBoolean()) {
listener.onFailure(new TaskCancelledException(testTask.getReasonCancelled()));
} else {
listener.onResponse(Empty.INSTANCE);
}
} else {
listener.onFailure(new RuntimeException(testTask.getReasonCancelled()));
}
} else {
fail("We really shouldn't be here");
}
}
} catch (InterruptedException e) {
listener.onFailure(e);
}
}
}
public static class TestTaskAction extends Action<TestTasksRequest, TestTasksResponse, TestTasksRequestBuilder> {
public static final TestTaskAction INSTANCE = new TestTaskAction();
public static final String NAME = "cluster:admin/persistent/task_test";
private TestTaskAction() {
super(NAME);
}
@Override
public TestTasksResponse newResponse() {
return new TestTasksResponse();
}
@Override
public TestTasksRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new TestTasksRequestBuilder(client);
}
}
public static class TestTask extends PersistentTask {
private volatile String operation;
public TestTask(long id, String type, String action, String description, TaskId parentTask) {
super(id, type, action, description, parentTask);
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
}
static class TestTaskResponse implements Writeable {
public TestTaskResponse() {
}
public TestTaskResponse(StreamInput in) throws IOException {
in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
}
}
public static class TestTasksRequest extends BaseTasksRequest<TestTasksRequest> {
private String operation;
public TestTasksRequest() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operation = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(operation);
}
public void setOperation(String operation) {
this.operation = operation;
}
public String getOperation() {
return operation;
}
}
public static class TestTasksRequestBuilder extends TasksRequestBuilder<TestTasksRequest, TestTasksResponse, TestTasksRequestBuilder> {
protected TestTasksRequestBuilder(ElasticsearchClient client) {
super(client, TestTaskAction.INSTANCE, new TestTasksRequest());
}
public TestTasksRequestBuilder setOperation(String operation) {
request.setOperation(operation);
return this;
}
}
public static class TestTasksResponse extends BaseTasksResponse {
private List<TestTaskResponse> tasks;
public TestTasksResponse() {
}
public TestTasksResponse(List<TestTaskResponse> tasks, List<TaskOperationFailure> taskFailures,
List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tasks = in.readList(TestTaskResponse::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(tasks);
}
public List<TestTaskResponse> getTasks() {
return tasks;
}
}
public static class TransportTestTaskAction extends TransportTasksAction<TestTask,
TestTasksRequest, TestTasksResponse, TestTaskResponse> {
@Inject
public TransportTestTaskAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, String nodeExecutor) {
super(settings, TestTaskAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
TestTasksRequest::new, TestTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
protected TestTasksResponse newResponse(TestTasksRequest request, List<TestTaskResponse> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
return new TestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TestTaskResponse readTaskResponse(StreamInput in) throws IOException {
return new TestTaskResponse(in);
}
@Override
protected void taskOperation(TestTasksRequest request, TestTask task, ActionListener<TestTaskResponse> listener) {
task.setOperation(request.operation);
listener.onResponse(new TestTaskResponse());
}
@Override
protected boolean accumulateExceptions() {
return false;
}
}
}