From 9fc6ce83eef8e4ab7c7104e41a181b16d39e4947 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 27 Mar 2017 14:21:01 -0400 Subject: [PATCH] Persistent Tasks: Merge NodePersistentTask and RunningPersistentTask (elastic/x-pack-elasticsearch#842) Refactors NodePersistentTask and RunningPersistentTask into a single AllocatedPersistentTask. Makes it possible to update Persistent Task Status via AllocatedPersistentTask. Original commit: elastic/x-pack-elasticsearch@8f59d7b81937fb8830ad466eaa1a3e51a9e734bc --- dev-tools/checkstyle_suppressions.xml | 2 +- .../xpack/ml/action/OpenJobAction.java | 6 +- .../xpack/ml/action/StartDatafeedAction.java | 8 +- .../persistent/AllocatedPersistentTask.java | 111 ++++++++++++ .../xpack/persistent/NodePersistentTask.java | 61 ------- .../NodePersistentTasksExecutor.java | 2 +- .../persistent/PersistentTaskRequest.java | 2 +- .../persistent/PersistentTasksExecutor.java | 4 +- .../PersistentTasksNodeService.java | 168 +++++------------- ...PersistentTasksNodeServiceStatusTests.java | 3 +- .../PersistentTasksNodeServiceTests.java | 6 +- .../persistent/TestPersistentTasksPlugin.java | 4 +- 12 files changed, 175 insertions(+), 202 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java delete mode 100644 plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTask.java diff --git a/dev-tools/checkstyle_suppressions.xml b/dev-tools/checkstyle_suppressions.xml index ae10c3804da..75f1a634f49 100644 --- a/dev-tools/checkstyle_suppressions.xml +++ b/dev-tools/checkstyle_suppressions.xml @@ -373,9 +373,9 @@ + - diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 9d2814a13c1..1697e7a0861 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -51,7 +51,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.JobStateObserver; -import org.elasticsearch.xpack.persistent.NodePersistentTask; +import org.elasticsearch.xpack.persistent.AllocatedPersistentTask; import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; @@ -251,7 +251,7 @@ public class OpenJobAction extends Action listener) { + protected void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener listener) { JobTask jobTask = (JobTask) task; jobTask.autodetectProcessManager = autodetectProcessManager; autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index d877548a64d..e8ac9e6da9b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -56,7 +56,7 @@ import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.NodePersistentTask; +import org.elasticsearch.xpack.persistent.AllocatedPersistentTask; import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; @@ -292,7 +292,7 @@ public class StartDatafeedAction } } - public static class DatafeedTask extends NodePersistentTask { + public static class DatafeedTask extends AllocatedPersistentTask { private final String datafeedId; private final long startTime; @@ -421,9 +421,9 @@ public class StartDatafeedAction } @Override - protected void nodeOperation(NodePersistentTask nodePersistentTask, Request request, + protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request, ActionListener listener) { - DatafeedTask datafeedTask = (DatafeedTask) nodePersistentTask; + DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; datafeedTask.datafeedJobRunner = datafeedJobRunner; datafeedJobRunner.run(datafeedTask, (error) -> { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java new file mode 100644 index 00000000000..b85b76d20e1 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java @@ -0,0 +1,111 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.persistent; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Represents a executor node operation that corresponds to a persistent task + */ +public class AllocatedPersistentTask extends CancellableTask { + private long persistentTaskId; + + private final AtomicReference state; + @Nullable + private Exception failure; + + private PersistentTasksService persistentTasksService; + + + public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) { + super(id, type, action, description, parentTask); + this.state = new AtomicReference<>(State.STARTED); + } + + @Override + public boolean shouldCancelChildrenOnCancellation() { + return true; + } + + // In case of persistent tasks we always need to return: `false` + // because in case of persistent task the parent task isn't a task in the task manager, but in cluster state. + // This instructs the task manager not to try to kill this persistent task when the task manager cannot find + // a fake parent node id "cluster" in the cluster state + @Override + public final boolean cancelOnParentLeaving() { + return false; + } + + @Override + public Status getStatus() { + return new PersistentTasksNodeService.Status(state.get()); + } + + /** + * Updates the persistent state for the corresponding persistent task. + * + * This doesn't affect the status of this allocated task. + */ + public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) { + persistentTasksService.updateStatus(persistentTaskId, status, listener); + } + + public long getPersistentTaskId() { + return persistentTaskId; + } + + void init(PersistentTasksService persistentTasksService, long persistentTaskId) { + this.persistentTasksService = persistentTasksService; + this.persistentTaskId = persistentTaskId; + } + + public Exception getFailure() { + return failure; + } + + boolean startNotification(Exception failure) { + boolean result = state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.FAILED); + if (result) { + this.failure = failure; + } + return result; + } + + boolean notificationFailed() { + return state.compareAndSet(AllocatedPersistentTask.State.FAILED, AllocatedPersistentTask.State.FAILED_NOTIFICATION); + } + + boolean restartCompletionNotification() { + return state.compareAndSet(AllocatedPersistentTask.State.FAILED_NOTIFICATION, AllocatedPersistentTask.State.FAILED); + } + + boolean markAsNotified() { + return state.compareAndSet(AllocatedPersistentTask.State.FAILED, AllocatedPersistentTask.State.NOTIFIED); + } + + boolean markAsCancelled() { + return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.CANCELLED); + } + + public State getState() { + return state.get(); + } + + public enum State { + STARTED, // the task is currently running + CANCELLED, // the task is cancelled + FAILED, // the task is done running and trying to notify caller + FAILED_NOTIFICATION, // the caller notification failed + NOTIFIED // the caller was notified, the task can be removed + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTask.java deleted file mode 100644 index f02aefb7117..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTask.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.persistent; - -import org.elasticsearch.common.inject.Provider; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.TaskId; - -/** - * Represents a executor node operation that corresponds to a persistent task - */ -public class NodePersistentTask extends CancellableTask { - private Provider statusProvider; - - private long persistentTaskId; - - public NodePersistentTask(long id, String type, String action, String description, TaskId parentTask) { - super(id, type, action, description, parentTask); - } - - @Override - public boolean shouldCancelChildrenOnCancellation() { - return true; - } - - // In case of persistent tasks we always need to return: `false` - // because in case of persistent task the parent task isn't a task in the task manager, but in cluster state. - // This instructs the task manager not to try to kill this persistent task when the task manager cannot find - // a fake parent node id "cluster" in the cluster state - @Override - public final boolean cancelOnParentLeaving() { - return false; - } - - @Override - public Status getStatus() { - Provider statusProvider = this.statusProvider; - if (statusProvider != null) { - return statusProvider.get(); - } else { - return null; - } - } - - public void setStatusProvider(Provider statusProvider) { - assert this.statusProvider == null; - this.statusProvider = statusProvider; - } - - public long getPersistentTaskId() { - return persistentTaskId; - } - - public void setPersistentTaskId(long persistentTaskId) { - this.persistentTaskId = persistentTaskId; - } - -} \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java index de4802c24ca..ad8ed3ca0d4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java @@ -21,7 +21,7 @@ public class NodePersistentTasksExecutor { } public void executeTask(Request request, - NodePersistentTask task, + AllocatedPersistentTask task, PersistentTasksExecutor action, ActionListener listener) { threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskRequest.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskRequest.java index 2f753fd8220..38205801ca9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskRequest.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskRequest.java @@ -17,6 +17,6 @@ import org.elasticsearch.tasks.TaskId; public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new NodePersistentTask(id, type, action, getDescription(), parentTaskId); + return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId); } } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java index 118077fd2da..8b8c7de0323 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java @@ -93,7 +93,7 @@ public abstract class PersistentTasksExecutor listener) { + protected void updatePersistentTaskStatus(AllocatedPersistentTask task, Task.Status status, ActionListener listener) { persistentTasksService.updateStatus(task.getPersistentTaskId(), status, new PersistentTaskOperationListener() { @Override @@ -115,7 +115,7 @@ public abstract class PersistentTasksExecutor listener); + protected abstract void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener listener); public String getExecutor() { return executor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java index 063bce14ca4..88234ed1553 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java @@ -11,10 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -34,7 +32,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import static java.util.Objects.requireNonNull; @@ -43,7 +40,7 @@ import static java.util.Objects.requireNonNull; * non-transport client nodes in the cluster and monitors cluster state changes to detect started commands. */ public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener { - private final Map runningTasks = new HashMap<>(); + private final Map runningTasks = new HashMap<>(); private final PersistentTasksService persistentTasksService; private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry; private final TaskManager taskManager; @@ -77,14 +74,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu for (PersistentTask taskInProgress : tasks.tasks()) { if (localNodeId.equals(taskInProgress.getExecutorNode())) { PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()); - RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId); + AllocatedPersistentTask persistentTask = runningTasks.get(persistentTaskId); if (persistentTask == null) { // New task - let's start it startTask(taskInProgress); } else { // The task is still running notVisitedTasks.remove(persistentTaskId); - if (persistentTask.getState() == State.FAILED_NOTIFICATION) { + if (persistentTask.getState() == AllocatedPersistentTask.State.FAILED_NOTIFICATION) { // We tried to notify the master about this task before but the notification failed and // the master doesn't seem to know about it - retry notification restartCompletionNotification(persistentTask); @@ -95,14 +92,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } for (PersistentTaskId id : notVisitedTasks) { - RunningPersistentTask task = runningTasks.get(id); - if (task.getState() == State.NOTIFIED || task.getState() == State.FAILED) { + AllocatedPersistentTask task = runningTasks.get(id); + if (task.getState() == AllocatedPersistentTask.State.NOTIFIED || task.getState() == AllocatedPersistentTask.State.FAILED) { // Result was sent to the caller and the caller acknowledged acceptance of the result finishTask(id); - } else if (task.getState() == State.FAILED_NOTIFICATION) { + } else if (task.getState() == AllocatedPersistentTask.State.FAILED_NOTIFICATION) { // We tried to send result to master, but it failed and master doesn't know about this task // this shouldn't really happen, unless this node is severally out of sync with the master - logger.warn("failed to notify master about task {}", task.getId()); + logger.warn("failed to notify master about task {}", task.getPersistentTaskId()); finishTask(id); } else { // task is running locally, but master doesn't know about it - that means that the persistent task was removed @@ -117,16 +114,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu private void startTask(PersistentTask taskInProgress) { PersistentTasksExecutor action = persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName()); - NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", + AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]", taskInProgress.getRequest()); boolean processed = false; try { - RunningPersistentTask runningPersistentTask = new RunningPersistentTask(task, taskInProgress.getId()); - task.setStatusProvider(runningPersistentTask); - task.setPersistentTaskId(taskInProgress.getId()); - PersistentTaskListener listener = new PersistentTaskListener(runningPersistentTask); + task.init(persistentTasksService, taskInProgress.getId()); + PersistentTaskListener listener = new PersistentTaskListener(task); try { - runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), runningPersistentTask); + runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task); nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action, listener); } catch (Exception e) { // Submit task failure @@ -142,17 +137,17 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } private void finishTask(PersistentTaskId persistentTaskId) { - RunningPersistentTask task = runningTasks.remove(persistentTaskId); - if (task != null && task.getTask() != null) { - taskManager.unregister(task.getTask()); + AllocatedPersistentTask task = runningTasks.remove(persistentTaskId); + if (task != null) { + taskManager.unregister(task); } } private void cancelTask(PersistentTaskId persistentTaskId) { - RunningPersistentTask task = runningTasks.remove(persistentTaskId); - if (task != null && task.getTask() != null) { + AllocatedPersistentTask task = runningTasks.remove(persistentTaskId); + if (task != null) { if (task.markAsCancelled()) { - persistentTasksService.sendCancellation(task.getTask().getId(), new PersistentTaskOperationListener() { + persistentTasksService.sendCancellation(task.getId(), new PersistentTaskOperationListener() { @Override public void onResponse(long taskId) { logger.trace("Persistent task with id {} was cancelled", taskId); @@ -162,7 +157,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu @Override public void onFailure(Exception e) { // There is really nothing we can do in case of failure here - logger.warn((Supplier) () -> new ParameterizedMessage("failed to cancel task {}", task.getId()), e); + logger.warn((Supplier) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e); } }); } @@ -170,10 +165,10 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } - private void restartCompletionNotification(RunningPersistentTask task) { - logger.trace("resending notification for task {}", task.getId()); - if (task.getState() == State.CANCELLED) { - taskManager.unregister(task.getTask()); + private void restartCompletionNotification(AllocatedPersistentTask task) { + logger.trace("resending notification for task {}", task.getPersistentTaskId()); + if (task.getState() == AllocatedPersistentTask.State.CANCELLED) { + taskManager.unregister(task); } else { if (task.restartCompletionNotification()) { // Need to fork otherwise: java.lang.AssertionError: should not be called by a cluster state applier. @@ -188,35 +183,35 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu @Override protected void doRun() throws Exception { - persistentTasksService.sendCompletionNotification(task.getId(), task.getFailure(), listener); + persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), task.getFailure(), listener); } }); } catch (Exception e) { listener.onFailure(e); } } else { - logger.warn("attempt to resend notification for task {} in the {} state", task.getId(), task.getState()); + logger.warn("attempt to resend notification for task {} in the {} state", task.getPersistentTaskId(), task.getState()); } } } - private void startCompletionNotification(RunningPersistentTask task, Exception e) { - if (task.getState() == State.CANCELLED) { - taskManager.unregister(task.getTask()); + private void startCompletionNotification(AllocatedPersistentTask task, Exception e) { + if (task.getState() == AllocatedPersistentTask.State.CANCELLED) { + taskManager.unregister(task); } else { - logger.trace("sending notification for failed task {}", task.getId()); + logger.trace("sending notification for failed task {}", task.getPersistentTaskId()); if (task.startNotification(e)) { - persistentTasksService.sendCompletionNotification(task.getId(), e, new PublishedResponseListener(task)); + persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), e, new PublishedResponseListener(task)); } else { - logger.warn("attempt to send notification for task {} in the {} state", task.getId(), task.getState()); + logger.warn("attempt to send notification for task {} in the {} state", task.getPersistentTaskId(), task.getState()); } } } private class PersistentTaskListener implements ActionListener { - private final RunningPersistentTask task; + private final AllocatedPersistentTask task; - PersistentTaskListener(final RunningPersistentTask task) { + PersistentTaskListener(final AllocatedPersistentTask task) { this.task = task; } @@ -227,14 +222,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu @Override public void onFailure(Exception e) { - if (task.getTask().isCancelled()) { + if (task.isCancelled()) { // The task was explicitly cancelled - no need to restart it, just log the exception if it's not TaskCancelledException if (e instanceof TaskCancelledException == false) { logger.warn((Supplier) () -> new ParameterizedMessage( "cancelled task {} failed with an exception, cancellation reason [{}]", - task.getId(), task.getTask().getReasonCancelled()), e); + task.getPersistentTaskId(), task.getReasonCancelled()), e); } - if (CancelTasksRequest.DEFAULT_REASON.equals(task.getTask().getReasonCancelled())) { + if (CancelTasksRequest.DEFAULT_REASON.equals(task.getReasonCancelled())) { startCompletionNotification(task, null); } else { startCompletionNotification(task, e); @@ -246,39 +241,31 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } private class PublishedResponseListener implements PersistentTaskOperationListener { - private final RunningPersistentTask task; + private final AllocatedPersistentTask task; - PublishedResponseListener(final RunningPersistentTask task) { + PublishedResponseListener(final AllocatedPersistentTask task) { this.task = task; } @Override public void onResponse(long taskId) { - logger.trace("notification for task {} was successful", task.getId()); + logger.trace("notification for task {} was successful", task.getPersistentTaskId()); if (task.markAsNotified() == false) { - logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getId(), task.getState()); + logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getPersistentTaskId(), task.getState()); } - taskManager.unregister(task.getTask()); + taskManager.unregister(task); } @Override public void onFailure(Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("notification for task {} failed - retrying", task.getId()), e); + logger.warn((Supplier) () -> new ParameterizedMessage("notification for task {} failed - retrying", task.getPersistentTaskId()), e); if (task.notificationFailed() == false) { - logger.warn("attempt to mark restart notification for task {} in the {} state failed", task.getId(), task.getState()); + logger.warn("attempt to mark restart notification for task {} in the {} state failed", task.getPersistentTaskId(), task.getState()); } } } - public enum State { - STARTED, // the task is currently running - CANCELLED, // the task is cancelled - FAILED, // the task is done running and trying to notify caller - FAILED_NOTIFICATION, // the caller notification failed - NOTIFIED // the caller was notified, the task can be removed - } - private static class PersistentTaskId { private final long id; private final long allocationId; @@ -304,80 +291,17 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } } - private static class RunningPersistentTask implements Provider { - private final NodePersistentTask task; - private final long id; - private final AtomicReference state; - @Nullable - private Exception failure; - - RunningPersistentTask(NodePersistentTask task, long id) { - this(task, id, State.STARTED); - } - - RunningPersistentTask(NodePersistentTask task, long id, State state) { - this.task = task; - this.id = id; - this.state = new AtomicReference<>(state); - } - - public NodePersistentTask getTask() { - return task; - } - - public long getId() { - return id; - } - - public State getState() { - return state.get(); - } - - public Exception getFailure() { - return failure; - } - - public boolean startNotification(Exception failure) { - boolean result = state.compareAndSet(State.STARTED, State.FAILED); - if (result) { - this.failure = failure; - } - return result; - } - - public boolean notificationFailed() { - return state.compareAndSet(State.FAILED, State.FAILED_NOTIFICATION); - } - - public boolean restartCompletionNotification() { - return state.compareAndSet(State.FAILED_NOTIFICATION, State.FAILED); - } - - public boolean markAsNotified() { - return state.compareAndSet(State.FAILED, State.NOTIFIED); - } - - public boolean markAsCancelled() { - return state.compareAndSet(State.STARTED, State.CANCELLED); - } - - @Override - public Task.Status get() { - return new Status(state.get()); - } - } - public static class Status implements Task.Status { public static final String NAME = "persistent_executor"; - private final State state; + private final AllocatedPersistentTask.State state; - public Status(State state) { + public Status(AllocatedPersistentTask.State state) { this.state = requireNonNull(state, "State cannot be null"); } public Status(StreamInput in) throws IOException { - state = State.valueOf(in.readString()); + state = AllocatedPersistentTask.State.valueOf(in.readString()); } @Override @@ -403,7 +327,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu return Strings.toString(this); } - public State getState() { + public AllocatedPersistentTask.State getState() { return state; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceStatusTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceStatusTests.java index 4e7e5d57135..d22302b8883 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceStatusTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceStatusTests.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksNodeService.State; import org.elasticsearch.xpack.persistent.PersistentTasksNodeService.Status; import static org.hamcrest.Matchers.containsString; @@ -16,7 +15,7 @@ public class PersistentTasksNodeServiceStatusTests extends AbstractWireSerializi @Override protected Status createTestInstance() { - return new Status(randomFrom(State.values())); + return new Status(randomFrom(AllocatedPersistentTask.State.values())); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index d8cae0cbb4b..bbcf518041b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -324,11 +324,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { private class Execution { private final PersistentTaskRequest request; - private final NodePersistentTask task; + private final AllocatedPersistentTask task; private final PersistentTasksExecutor holder; private final ActionListener listener; - Execution(PersistentTaskRequest request, NodePersistentTask task, PersistentTasksExecutor holder, + Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor holder, ActionListener listener) { this.request = request; this.task = task; @@ -345,7 +345,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } @Override - public void executeTask(Request request, NodePersistentTask task, + public void executeTask(Request request, AllocatedPersistentTask task, PersistentTasksExecutor action, ActionListener listener) { executions.add(new Execution(request, task, action, listener)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java index fe2d6d16432..24954cdc8ae 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java @@ -326,7 +326,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } @Override - protected void nodeOperation(NodePersistentTask task, TestRequest request, ActionListener listener) { + protected void nodeOperation(AllocatedPersistentTask task, TestRequest request, ActionListener listener) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -409,7 +409,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } - public static class TestTask extends NodePersistentTask { + public static class TestTask extends AllocatedPersistentTask { private volatile String operation; public TestTask(long id, String type, String action, String description, TaskId parentTask) {