listener) {
- persistentActionService.sendRequest(actionName, request, listener);
- }
-
/**
* Updates the persistent task status in the cluster state.
*
@@ -104,10 +94,10 @@ public abstract class TransportPersistentAction listener) {
- persistentActionService.updateStatus(task.getPersistentTaskId(), status,
- new ActionListener() {
+ persistentTasksService.updateStatus(task.getPersistentTaskId(), status,
+ new PersistentTaskOperationListener() {
@Override
- public void onResponse(UpdatePersistentTaskStatusAction.Response response) {
+ public void onResponse(long taskId) {
listener.onResponse(Empty.INSTANCE);
}
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorRegistry.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorRegistry.java
new file mode 100644
index 00000000000..25781c66a7d
--- /dev/null
+++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorRegistry.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.persistent;
+
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Components that registers all persistent task executors
+ */
+public class PersistentTasksExecutorRegistry extends AbstractComponent {
+
+ private final Map> taskExecutors;
+
+ @SuppressWarnings("unchecked")
+ public PersistentTasksExecutorRegistry(Settings settings, Collection> taskExecutors) {
+ super(settings);
+ Map> map = new HashMap<>();
+ for (PersistentTasksExecutor> executor : taskExecutors) {
+ map.put(executor.getTaskName(), executor);
+ }
+ this.taskExecutors = Collections.unmodifiableMap(map);
+ }
+
+ @SuppressWarnings("unchecked")
+ public PersistentTasksExecutor getPersistentTaskExecutorSafe(String taskName) {
+ PersistentTasksExecutor executor = (PersistentTasksExecutor) taskExecutors.get(taskName);
+ if (executor == null) {
+ throw new IllegalStateException("Unknown persistent executor [" + taskName + "]");
+ }
+ return executor;
+ }
+}
\ No newline at end of file
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java
similarity index 80%
rename from plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java
rename to plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java
index 34156bb5017..063bce14ca4 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java
@@ -9,7 +9,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
-import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.Nullable;
@@ -19,12 +18,15 @@ import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
-import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
+import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException;
import java.util.HashMap;
@@ -37,33 +39,35 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
/**
- * This component is responsible for coordination of execution of persistent actions on individual nodes. It runs on all
+ * This component is responsible for coordination of execution of persistent tasks on individual nodes. It runs on all
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
*/
-public class PersistentActionCoordinator extends AbstractComponent implements ClusterStateListener {
+public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener {
private final Map runningTasks = new HashMap<>();
- private final PersistentActionService persistentActionService;
- private final PersistentActionRegistry persistentActionRegistry;
+ private final PersistentTasksService persistentTasksService;
+ private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry;
private final TaskManager taskManager;
- private final PersistentActionExecutor persistentActionExecutor;
+ private final ThreadPool threadPool;
+ private final NodePersistentTasksExecutor nodePersistentTasksExecutor;
- public PersistentActionCoordinator(Settings settings,
- PersistentActionService persistentActionService,
- PersistentActionRegistry persistentActionRegistry,
- TaskManager taskManager,
- PersistentActionExecutor persistentActionExecutor) {
+ public PersistentTasksNodeService(Settings settings,
+ PersistentTasksService persistentTasksService,
+ PersistentTasksExecutorRegistry persistentTasksExecutorRegistry,
+ TaskManager taskManager, ThreadPool threadPool,
+ NodePersistentTasksExecutor nodePersistentTasksExecutor) {
super(settings);
- this.persistentActionService = persistentActionService;
- this.persistentActionRegistry = persistentActionRegistry;
+ this.persistentTasksService = persistentTasksService;
+ this.persistentTasksExecutorRegistry = persistentTasksExecutorRegistry;
this.taskManager = taskManager;
- this.persistentActionExecutor = persistentActionExecutor;
+ this.threadPool = threadPool;
+ this.nodePersistentTasksExecutor = nodePersistentTasksExecutor;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
- PersistentTasks tasks = event.state().getMetaData().custom(PersistentTasks.TYPE);
- PersistentTasks previousTasks = event.previousState().getMetaData().custom(PersistentTasks.TYPE);
+ PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+ PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {
// We have some changes let's check if they are related to our node
@@ -111,10 +115,9 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
}
- private void startTask(PersistentTask taskInProgress) {
- PersistentActionRegistry.PersistentActionHolder holder =
- persistentActionRegistry.getPersistentActionHolderSafe(taskInProgress.getAction());
- NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]",
+ private void startTask(PersistentTask taskInProgress) {
+ PersistentTasksExecutor action = persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName());
+ NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
taskInProgress.getRequest());
boolean processed = false;
try {
@@ -124,7 +127,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
PersistentTaskListener listener = new PersistentTaskListener(runningPersistentTask);
try {
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), runningPersistentTask);
- persistentActionExecutor.executeAction(taskInProgress.getRequest(), task, holder, listener);
+ nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action, listener);
} catch (Exception e) {
// Submit task failure
listener.onFailure(e);
@@ -149,9 +152,11 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
RunningPersistentTask task = runningTasks.remove(persistentTaskId);
if (task != null && task.getTask() != null) {
if (task.markAsCancelled()) {
- persistentActionService.sendCancellation(task.getTask().getId(), new ActionListener() {
+ persistentTasksService.sendCancellation(task.getTask().getId(), new PersistentTaskOperationListener() {
@Override
- public void onResponse(CancelTasksResponse cancelTasksResponse) {
+ public void onResponse(long taskId) {
+ logger.trace("Persistent task with id {} was cancelled", taskId);
+
}
@Override
@@ -171,7 +176,24 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
taskManager.unregister(task.getTask());
} else {
if (task.restartCompletionNotification()) {
- persistentActionService.sendCompletionNotification(task.getId(), task.getFailure(), new PublishedResponseListener(task));
+ // Need to fork otherwise: java.lang.AssertionError: should not be called by a cluster state applier.
+ // reason [the applied cluster state is not yet available])
+ PublishedResponseListener listener = new PublishedResponseListener(task);
+ try {
+ threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
+ @Override
+ public void onFailure(Exception e) {
+ listener.onFailure(e);
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ persistentTasksService.sendCompletionNotification(task.getId(), task.getFailure(), listener);
+ }
+ });
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
} else {
logger.warn("attempt to resend notification for task {} in the {} state", task.getId(), task.getState());
}
@@ -184,7 +206,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
} else {
logger.trace("sending notification for failed task {}", task.getId());
if (task.startNotification(e)) {
- persistentActionService.sendCompletionNotification(task.getId(), e, new PublishedResponseListener(task));
+ persistentTasksService.sendCompletionNotification(task.getId(), e, new PublishedResponseListener(task));
} else {
logger.warn("attempt to send notification for task {} in the {} state", task.getId(), task.getState());
}
@@ -223,7 +245,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
}
}
- private class PublishedResponseListener implements ActionListener {
+ private class PublishedResponseListener implements PersistentTaskOperationListener {
private final RunningPersistentTask task;
PublishedResponseListener(final RunningPersistentTask task) {
@@ -232,7 +254,7 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
@Override
- public void onResponse(CompletionPersistentTaskAction.Response response) {
+ public void onResponse(long taskId) {
logger.trace("notification for task {} was successful", task.getId());
if (task.markAsNotified() == false) {
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getId(), task.getState());
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java
new file mode 100644
index 00000000000..e636393d751
--- /dev/null
+++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.persistent;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+
+/**
+ * This service is used by persistent actions to propagate changes in the action state and notify about completion
+ */
+public class PersistentTasksService extends AbstractComponent {
+
+ private final Client client;
+ private final ClusterService clusterService;
+
+ public PersistentTasksService(Settings settings, ClusterService clusterService, Client client) {
+ super(settings);
+ this.client = client;
+ this.clusterService = clusterService;
+ }
+
+ /**
+ * Creates the specified persistent action and tries to start it immediately, upon completion the task is
+ * removed from the cluster state
+ */
+ public void createPersistentActionTask(String action, Request request,
+ PersistentTaskOperationListener listener) {
+ createPersistentActionTask(action, request, false, true, listener);
+ }
+
+ /**
+ * Creates the specified persistent action. The action is started unless the stopped parameter is equal to true.
+ * If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion.
+ * Otherwise it will remain there in the stopped state.
+ */
+ public void createPersistentActionTask(String action, Request request,
+ boolean stopped,
+ boolean removeOnCompletion,
+ PersistentTaskOperationListener listener) {
+ CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request);
+ createPersistentActionRequest.setStopped(stopped);
+ createPersistentActionRequest.setRemoveOnCompletion(removeOnCompletion);
+ try {
+ client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
+ o -> listener.onResponse(o.getTaskId()), listener::onFailure));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ /**
+ * Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
+ *
+ */
+ public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) {
+ CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
+ try {
+ client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
+ listener::onFailure));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ /**
+ * Cancels the persistent task.
+ */
+ public void sendCancellation(long taskId, PersistentTaskOperationListener listener) {
+ DiscoveryNode localNode = clusterService.localNode();
+ CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
+ cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
+ cancelTasksRequest.setReason("persistent action was removed");
+ try {
+ client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
+ listener::onFailure));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ /**
+ * Updates status of the persistent task
+ */
+ public void updateStatus(long taskId, Task.Status status, PersistentTaskOperationListener listener) {
+ UpdatePersistentTaskStatusAction.Request updateStatusRequest = new UpdatePersistentTaskStatusAction.Request(taskId, status);
+ try {
+ client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap(
+ o -> listener.onResponse(taskId), listener::onFailure));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ /**
+ * Removes a persistent task
+ */
+ public void removeTask(long taskId, PersistentTaskOperationListener listener) {
+ RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
+ try {
+ client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
+ listener::onFailure));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ /**
+ * Starts a persistent task
+ */
+ public void startTask(long taskId, PersistentTaskOperationListener listener) {
+ StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(taskId);
+ try {
+ client.execute(StartPersistentTaskAction.INSTANCE, startRequest, ActionListener.wrap(o -> listener.onResponse(taskId),
+ listener::onFailure));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ public interface PersistentTaskOperationListener {
+ void onResponse(long taskId);
+ void onFailure(Exception e);
+ }
+
+}
\ No newline at end of file
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java
index 180defc03b0..d43ab432929 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java
@@ -148,16 +148,16 @@ public class RemovePersistentTaskAction extends Action {
- private final PersistentTaskClusterService persistentTaskClusterService;
+ private final PersistentTasksClusterService persistentTasksClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
- PersistentTaskClusterService persistentTaskClusterService,
+ PersistentTasksClusterService persistentTasksClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, RemovePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
- this.persistentTaskClusterService = persistentTaskClusterService;
+ this.persistentTasksClusterService = persistentTasksClusterService;
}
@Override
@@ -178,7 +178,7 @@ public class RemovePersistentTaskAction extends Action listener) {
- persistentTaskClusterService.removePersistentTask(request.taskId, new ActionListener() {
+ persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java
index 90a7b7023a9..d52c6f77693 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java
@@ -31,7 +31,7 @@ import java.io.IOException;
import java.util.Objects;
/**
- * This action can be used to start persistent action previously created using {@link CreatePersistentTaskAction}
+ * This action can be used to start a persistent task previously created using {@link CreatePersistentTaskAction}
*/
public class StartPersistentTaskAction extends Action {
- private final PersistentTaskClusterService persistentTaskClusterService;
+ private final PersistentTasksClusterService persistentTasksClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
- PersistentTaskClusterService persistentTaskClusterService,
+ PersistentTasksClusterService persistentTasksClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
- this.persistentTaskClusterService = persistentTaskClusterService;
+ this.persistentTasksClusterService = persistentTasksClusterService;
}
@Override
@@ -181,7 +181,7 @@ public class StartPersistentTaskAction extends Action listener) {
- persistentTaskClusterService.startPersistentTask(request.taskId, new ActionListener() {
+ persistentTasksClusterService.startPersistentTask(request.taskId, new ActionListener() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java
index d068a647ab0..f40dd323c93 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java
@@ -164,16 +164,16 @@ public class UpdatePersistentTaskStatusAction extends Action {
- private final PersistentTaskClusterService persistentTaskClusterService;
+ private final PersistentTasksClusterService persistentTasksClusterService;
@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
- PersistentTaskClusterService persistentTaskClusterService,
+ PersistentTasksClusterService persistentTasksClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, UpdatePersistentTaskStatusAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
- this.persistentTaskClusterService = persistentTaskClusterService;
+ this.persistentTasksClusterService = persistentTasksClusterService;
}
@Override
@@ -194,7 +194,7 @@ public class UpdatePersistentTaskStatusAction extends Action listener) {
- persistentTaskClusterService.updatePersistentTaskStatus(request.taskId, request.status, new ActionListener() {
+ persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.status, new ActionListener() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(new Response(true));
diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java
index d97640dee02..8bcd033eb7e 100644
--- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java
+++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java
@@ -5,31 +5,30 @@
*/
/**
- * The Persistent Actions are actions responsible for executing restartable actions that can survive disappearance of a
+ * The Persistent Tasks Executors are responsible for executing restartable tasks that can survive disappearance of a
* coordinating and executor nodes.
*
- * In order to be resilient to node restarts, the persistent actions are using the cluster state instead of a transport service to send
+ * In order to be resilient to node restarts, the persistent tasks are using the cluster state instead of a transport service to send
* requests and responses. The execution is done in six phases:
*
- * 1. The coordinating node sends an ordinary transport request to the master node to start a new persistent action. This action is handled
- * by the {@link org.elasticsearch.xpack.persistent.PersistentActionService}, which is using
- * {@link org.elasticsearch.xpack.persistent.PersistentTaskClusterService} to update cluster state with the record about running persistent
+ * 1. The coordinating node sends an ordinary transport request to the master node to start a new persistent task. This task is handled
+ * by the {@link org.elasticsearch.xpack.persistent.PersistentTasksService}, which is using
+ * {@link org.elasticsearch.xpack.persistent.PersistentTasksClusterService} to update cluster state with the record about running persistent
* task.
*
- * 2. The master node updates the {@link org.elasticsearch.xpack.persistent.PersistentTasks} in the cluster state to indicate that
- * there is a new persistent action
- * running in the system.
+ * 2. The master node updates the {@link org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData} in the cluster state to indicate
+ * that there is a new persistent task is running in the system.
*
- * 3. The {@link org.elasticsearch.xpack.persistent.PersistentActionCoordinator} running on every node in the cluster monitors changes in
- * the cluster state and starts execution of all new actions assigned to the node it is running on.
+ * 3. The {@link org.elasticsearch.xpack.persistent.PersistentTasksNodeService} running on every node in the cluster monitors changes in
+ * the cluster state and starts execution of all new tasks assigned to the node it is running on.
*
- * 4. If the action fails to start on the node, the {@link org.elasticsearch.xpack.persistent.PersistentActionCoordinator} uses the
- * {@link org.elasticsearch.xpack.persistent.PersistentTasks} to notify the
- * {@link org.elasticsearch.xpack.persistent.PersistentActionService}, which reassigns the action to another node in the cluster.
+ * 4. If the task fails to start on the node, the {@link org.elasticsearch.xpack.persistent.PersistentTasksNodeService} uses the
+ * {@link org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData} to notify the
+ * {@link org.elasticsearch.xpack.persistent.PersistentTasksService}, which reassigns the action to another node in the cluster.
*
- * 5. If action finishes successfully on the node and calls listener.onResponse(), the corresponding persistent action is removed from the
- * cluster state unless .
+ * 5. If a task finishes successfully on the node and calls listener.onResponse(), the corresponding persistent action is removed from the
+ * cluster state unless removeOnCompletion flag for this task is set to false.
*
- * 6. The {@link org.elasticsearch.xpack.persistent.RemovePersistentTaskAction} action can be also used to remove the persistent action.
+ * 6. The {@link org.elasticsearch.xpack.persistent.RemovePersistentTaskAction} action can be also used to remove the persistent task.
*/
package org.elasticsearch.xpack.persistent;
\ No newline at end of file
diff --git a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java
index 59b982fb4ef..0d8903fdc72 100644
--- a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java
+++ b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java
@@ -28,8 +28,7 @@ import org.elasticsearch.xpack.ml.client.MachineLearningClient;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
-import org.elasticsearch.xpack.persistent.PersistentActionResponse;
-import org.elasticsearch.xpack.persistent.PersistentTasks;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.junit.Before;
import java.util.Collections;
@@ -99,7 +98,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
// test that license restricted apis do not work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
- PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), listener);
listener.actionGet();
fail("open job action should not be enabled!");
@@ -123,9 +122,9 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
// test that license restricted apis do now work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
- PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), listener);
- PersistentActionResponse response = listener.actionGet();
+ OpenJobAction.Response response = listener.actionGet();
assertNotNull(response);
}
}
@@ -192,12 +191,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet();
assertNotNull(putDatafeedResponse);
// open job
- PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
- PersistentActionResponse openJobResponse = openJobListener.actionGet();
+ OpenJobAction.Response openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
// start datafeed
- PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener);
listener.actionGet();
}
@@ -218,7 +217,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
assertEquals(DatafeedState.STOPPED, datafeedState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
- PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
+ PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
assertEquals(0, tasks.taskMap().size());
});
@@ -228,12 +227,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// open job
- PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
- PersistentActionResponse openJobResponse = openJobListener.actionGet();
+ OpenJobAction.Response openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
// start datafeed
- PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener);
listener.actionGet();
}
@@ -246,7 +245,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
assertEquals(DatafeedState.STARTED, datafeedState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
- PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
+ PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
assertEquals(2, tasks.taskMap().size());
});
@@ -266,7 +265,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
assertEquals(DatafeedState.STOPPED, datafeedState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
- PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
+ PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
assertEquals(0, tasks.taskMap().size());
});
}
@@ -287,9 +286,9 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener);
PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet();
assertNotNull(putDatafeedResponse);
- PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
- PersistentActionResponse openJobResponse = openJobListener.actionGet();
+ OpenJobAction.Response openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
}
@@ -303,14 +302,14 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
JobState jobState = getJobStats("foo").getState();
assertEquals(JobState.CLOSED, jobState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
- PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
+ PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
assertEquals(0, tasks.taskMap().size());
});
// test that license restricted apis do not work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
- PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener);
listener.actionGet();
fail("start datafeed action should not be enabled!");
@@ -328,14 +327,14 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// re-open job now that the license is valid again
- PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
- PersistentActionResponse openJobResponse = openJobListener.actionGet();
+ OpenJobAction.Response openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
- PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener);
- PersistentActionResponse response = listener.actionGet();
+ StartDatafeedAction.Response response = listener.actionGet();
assertNotNull(response);
}
}
@@ -356,14 +355,14 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener);
PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet();
assertNotNull(putDatafeedResponse);
- PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
- PersistentActionResponse openJobResponse = openJobListener.actionGet();
+ OpenJobAction.Response openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
- PlainListenableActionFuture startDatafeedListener = new PlainListenableActionFuture<>(
+ PlainListenableActionFuture startDatafeedListener = new PlainListenableActionFuture<>(
client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), startDatafeedListener);
- PersistentActionResponse startDatafeedResponse = startDatafeedListener.actionGet();
+ StartDatafeedAction.Response startDatafeedResponse = startDatafeedListener.actionGet();
assertNotNull(startDatafeedResponse);
}
@@ -400,9 +399,9 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
- PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
+ PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
- PersistentActionResponse openJobResponse = openJobListener.actionGet();
+ OpenJobAction.Response openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
}
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java
index 3fa8ac92251..8d5f2720cd6 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java
@@ -26,8 +26,8 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase;
-import org.elasticsearch.xpack.persistent.PersistentTasks;
-import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.io.IOException;
import java.util.Collections;
@@ -36,7 +36,7 @@ import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig;
import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
-import static org.elasticsearch.xpack.persistent.PersistentTasks.INITIAL_ASSIGNMENT;
+import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@@ -133,7 +133,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase {
assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue());
- builder.deleteJob("1", new PersistentTasks(0L, Collections.emptyMap()));
+ builder.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.emptyMap()));
result = builder.build();
assertThat(result.getJobs().get("1"), nullValue());
assertThat(result.getDatafeeds().get("1"), nullValue());
@@ -151,7 +151,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase {
PersistentTask task = createJobTask(0L, "1", null, JobState.CLOSED);
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
- () -> builder2.deleteJob("1", new PersistentTasks(0L, Collections.singletonMap(0L, task))));
+ () -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task))));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
}
@@ -163,7 +163,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase {
builder.putDatafeed(datafeedConfig1);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
- () -> builder.deleteJob(job1.getId(), new PersistentTasks(0L, Collections.emptyMap())));
+ () -> builder.deleteJob(job1.getId(), new PersistentTasksCustomMetaData(0L, Collections.emptyMap())));
assertThat(e.status(), equalTo(RestStatus.CONFLICT));
String expectedMsg = "Cannot delete job [" + job1.getId() + "] while datafeed [" + datafeedConfig1.getId() + "] refers to it";
assertThat(e.getMessage(), equalTo(expectedMsg));
@@ -172,7 +172,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase {
public void testRemoveJob_failBecauseJobDoesNotExist() {
MlMetadata.Builder builder1 = new MlMetadata.Builder();
expectThrows(ResourceNotFoundException.class,
- () -> builder1.deleteJob("1", new PersistentTasks(0L, Collections.emptyMap())));
+ () -> builder1.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.emptyMap())));
}
public void testCrudDatafeed() {
@@ -187,7 +187,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase {
assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1));
builder = new MlMetadata.Builder(result);
- builder.removeDatafeed("datafeed1", new PersistentTasks(0, Collections.emptyMap()));
+ builder.removeDatafeed("datafeed1", new PersistentTasksCustomMetaData(0, Collections.emptyMap()));
result = builder.build();
assertThat(result.getJobs().get("job_id"), sameInstance(job1));
assertThat(result.getDatafeeds().get("datafeed1"), nullValue());
@@ -271,8 +271,8 @@ public class MlMetadataTests extends AbstractSerializingTestCase {
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTask taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
- PersistentTasks tasksInProgress =
- new PersistentTasks(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
+ PersistentTasksCustomMetaData tasksInProgress =
+ new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId());
update.setScrollSize(5000);
@@ -333,8 +333,8 @@ public class MlMetadataTests extends AbstractSerializingTestCase {
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTask taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT);
- PersistentTasks tasksInProgress =
- new PersistentTasks(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
+ PersistentTasksCustomMetaData tasksInProgress =
+ new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java
index 0a1e70563da..44014744c62 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java
@@ -13,11 +13,11 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
-import org.elasticsearch.xpack.persistent.PersistentTasks;
-import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
import java.util.Collections;
import java.util.HashMap;
@@ -35,10 +35,11 @@ public class CloseJobActionTests extends ESTestCase {
createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED));
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
- .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task))));
+ .putCustom(PersistentTasksCustomMetaData.TYPE,
+ new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task))));
ClusterState result = CloseJobAction.moveJobToClosingState("job_id", csBuilder.build());
- PersistentTasks actualTasks = result.getMetaData().custom(PersistentTasks.TYPE);
+ PersistentTasksCustomMetaData actualTasks = result.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertEquals(JobState.CLOSING, actualTasks.getTask(1L).getStatus());
MlMetadata actualMetadata = result.metaData().custom(MlMetadata.TYPE);
@@ -49,7 +50,7 @@ public class CloseJobActionTests extends ESTestCase {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
- .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap())));
+ .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, Collections.emptyMap())));
expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder.build()));
}
@@ -59,14 +60,15 @@ public class CloseJobActionTests extends ESTestCase {
PersistentTask task = createJobTask(1L, "job_id", null, JobState.OPENING);
ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
- .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task))));
+ .putCustom(PersistentTasksCustomMetaData.TYPE,
+ new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task))));
ElasticsearchStatusException result =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build()));
assertEquals("cannot close job [job_id], expected job state [opened], but got [opening]", result.getMessage());
ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
- .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap())));
+ .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, Collections.emptyMap())));
result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build()));
assertEquals("cannot close job [job_id], expected job state [opened], but got [closed]", result.getMessage());
}
@@ -81,7 +83,7 @@ public class CloseJobActionTests extends ESTestCase {
tasks.put(2L, createDatafeedTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED));
ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
- .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, tasks))).build();
+ .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, tasks))).build();
ElasticsearchStatusException e =
expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.validateAndFindTask("job_id", cs1));
@@ -95,7 +97,7 @@ public class CloseJobActionTests extends ESTestCase {
}
ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
- .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, tasks))).build();
+ .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(1L, tasks))).build();
assertEquals(jobTask, CloseJobAction.validateAndFindTask("job_id", cs2));
}
@@ -103,7 +105,7 @@ public class CloseJobActionTests extends ESTestCase {
String nodeId, DatafeedState datafeedState) {
PersistentTask task =
new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), false, true,
- new PersistentTasks.Assignment(nodeId, "test assignment"));
+ new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, datafeedState);
return task;
}
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java
index c954d4156a5..7729b68fab5 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java
@@ -24,10 +24,9 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
-import org.elasticsearch.xpack.persistent.PersistentActionCoordinator;
-import org.elasticsearch.xpack.persistent.PersistentActionRequest;
-import org.elasticsearch.xpack.persistent.PersistentActionResponse;
-import org.elasticsearch.xpack.persistent.PersistentTasks;
+import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
+import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.security.Security;
import org.junit.After;
@@ -70,12 +69,13 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
List entries = new ArrayList<>(ClusterModule.getNamedWriteables());
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
- entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new));
- entries.add(new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME,
+ entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE,
+ PersistentTasksCustomMetaData::new));
+ entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, StartDatafeedAction.NAME,
StartDatafeedAction.Request::new));
- entries.add(new NamedWriteableRegistry.Entry(PersistentActionRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new));
- entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentActionCoordinator.Status.NAME,
- PersistentActionCoordinator.Status::new));
+ entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new));
+ entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
+ PersistentTasksNodeService.Status::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
@@ -187,8 +187,7 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
assertTrue(putDatafeedResponse.isAcknowledged());
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L);
- PersistentActionResponse startDatafeedResponse =
- client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
+ StartDatafeedAction.Response startDatafeedResponse = client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java
index d34e8e80a06..25700d03dbe 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java
@@ -29,12 +29,11 @@ import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
-import org.elasticsearch.xpack.persistent.PersistentTasks;
-import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
-import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.net.InetAddress;
import java.util.ArrayList;
@@ -58,14 +57,14 @@ public class OpenJobActionTests extends ESTestCase {
PersistentTask task =
createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED));
- PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
+ PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
- OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasks(1L, Collections.emptyMap()), nodes);
+ OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()), nodes);
OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes);
task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED);
- tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
+ tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes);
}
@@ -95,7 +94,7 @@ public class OpenJobActionTests extends ESTestCase {
JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING);
PersistentTask task = createJobTask(1L, "job_id", "_node_id", jobState);
- PersistentTasks tasks1 = new PersistentTasks(1L, Collections.singletonMap(1L, task));
+ PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes));
@@ -103,7 +102,7 @@ public class OpenJobActionTests extends ESTestCase {
jobState = randomFrom(JobState.OPENING, JobState.CLOSING);
task = createJobTask(1L, "job_id", "_other_node_id", jobState);
- PersistentTasks tasks2 = new PersistentTasks(1L, Collections.singletonMap(1L, task));
+ PersistentTasksCustomMetaData tasks2 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks2, nodes));
@@ -129,14 +128,14 @@ public class OpenJobActionTests extends ESTestCase {
new Assignment("_node_id1", "test assignment")));
taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true,
new Assignment("_node_id2", "test assignment")));
- PersistentTasks tasks = new PersistentTasks(3L, taskMap);
+ PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4");
cs.nodes(nodes);
- metaData.putCustom(PersistentTasks.TYPE, tasks);
+ metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger);
@@ -160,14 +159,14 @@ public class OpenJobActionTests extends ESTestCase {
taskMap.put(id, createJobTask(id, "job_id" + id, nodeId, JobState.OPENED));
}
}
- PersistentTasks tasks = new PersistentTasks(numNodes * maxRunningJobsPerNode, taskMap);
+ PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(numNodes * maxRunningJobsPerNode, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2");
cs.nodes(nodes);
- metaData.putCustom(PersistentTasks.TYPE, tasks);
+ metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
@@ -187,14 +186,14 @@ public class OpenJobActionTests extends ESTestCase {
PersistentTask task =
new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true,
new Assignment("_node_id1", "test assignment"));
- PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task));
+ PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2");
cs.nodes(nodes);
- metaData.putCustom(PersistentTasks.TYPE, tasks);
+ metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
cs.metaData(metaData);
cs.routingTable(routingTable.build());
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger);
@@ -220,7 +219,7 @@ public class OpenJobActionTests extends ESTestCase {
taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING));
taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING));
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING));
- PersistentTasks tasks = new PersistentTasks(5L, taskMap);
+ PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
csBuilder.nodes(nodes);
@@ -228,7 +227,7 @@ public class OpenJobActionTests extends ESTestCase {
RoutingTable.Builder routingTable = RoutingTable.builder();
addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7");
csBuilder.routingTable(routingTable.build());
- metaData.putCustom(PersistentTasks.TYPE, tasks);
+ metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
csBuilder.metaData(metaData);
ClusterState cs = csBuilder.build();
@@ -237,30 +236,30 @@ public class OpenJobActionTests extends ESTestCase {
PersistentTask lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING);
taskMap.put(5L, lastTask);
- tasks = new PersistentTasks(6L, taskMap);
+ tasks = new PersistentTasksCustomMetaData(6L, taskMap);
csBuilder = ClusterState.builder(cs);
- csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks));
+ csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTask<>(lastTask, false, new Assignment("_node_id3", "test assignment")));
- tasks = new PersistentTasks(6L, taskMap);
+ tasks = new PersistentTasksCustomMetaData(6L, taskMap);
csBuilder = ClusterState.builder(cs);
- csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks));
+ csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTask<>(lastTask, null));
- tasks = new PersistentTasks(6L, taskMap);
+ tasks = new PersistentTasksCustomMetaData(6L, taskMap);
csBuilder = ClusterState.builder(cs);
- csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks));
+ csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
cs = csBuilder.build();
result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger);
assertNull("no node selected, because null state", result.getExecutorNode());
diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java
index 91c3b281cf1..951c141b7b3 100644
--- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java
+++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java
@@ -23,9 +23,9 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
-import org.elasticsearch.xpack.persistent.PersistentTasks;
-import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
-import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
+import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.net.InetAddress;
import java.util.Collections;
@@ -35,7 +35,7 @@ import java.util.Map;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
-import static org.elasticsearch.xpack.persistent.PersistentTasks.INITIAL_ASSIGNMENT;
+import static org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT;
import static org.hamcrest.Matchers.equalTo;
public class StartDatafeedActionTests extends ESTestCase {
@@ -48,7 +48,7 @@ public class StartDatafeedActionTests extends ESTestCase {
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING);
PersistentTask task = createJobTask(0L, job.getId(), "node_id", jobState);
- PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
+ PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@@ -57,7 +57,7 @@ public class StartDatafeedActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
- .putCustom(PersistentTasks.TYPE, tasks))
+ .putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
.nodes(nodes);
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
@@ -66,10 +66,10 @@ public class StartDatafeedActionTests extends ESTestCase {
"] while state [opened] is required", result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED);
- tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
+ tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
- .putCustom(PersistentTasks.TYPE, tasks))
+ .putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
.nodes(nodes);
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertEquals("node_id", result.getExecutorNode());
@@ -83,7 +83,7 @@ public class StartDatafeedActionTests extends ESTestCase {
String nodeId = randomBoolean() ? "node_id2" : null;
PersistentTask task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED);
- PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
+ PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@@ -92,7 +92,7 @@ public class StartDatafeedActionTests extends ESTestCase {
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
- .putCustom(PersistentTasks.TYPE, tasks))
+ .putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
.nodes(nodes);
Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
@@ -101,10 +101,10 @@ public class StartDatafeedActionTests extends ESTestCase {
result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED);
- tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task));
+ tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task));
cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())
- .putCustom(PersistentTasks.TYPE, tasks))
+ .putCustom(PersistentTasksCustomMetaData.TYPE, tasks))
.nodes(nodes);
result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build());
assertEquals("node_id1", result.getExecutorNode());
@@ -128,7 +128,7 @@ public class StartDatafeedActionTests extends ESTestCase {
PersistentTask