From 49223a8782c24b7d09c9b2a90e9918838900513d Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 10 Apr 2017 17:32:30 -0400 Subject: [PATCH] Persistent Tasks: remove listener from PersistentTasksExecutor#nodeOperation (elastic/x-pack-elasticsearch#1032) Instead of having a separate listener for indicating that the current task is finished, this commit is switching to use allocated object itself. Original commit: elastic/x-pack-elasticsearch@7ad5362121dacc8d335899171b2ca5438a46d14a --- .../xpack/ml/action/OpenJobAction.java | 7 +- .../xpack/ml/action/StartDatafeedAction.java | 8 +- .../persistent/AllocatedPersistentTask.java | 79 +++++++++++++---- .../NodePersistentTasksExecutor.java | 11 +-- .../persistent/PersistentTasksExecutor.java | 7 +- .../PersistentTasksNodeService.java | 87 ++----------------- .../PersistentTasksNodeServiceTests.java | 21 ++--- .../persistent/TestPersistentTasksPlugin.java | 15 ++-- 8 files changed, 99 insertions(+), 136 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 6c683a6a80b..e4464553dda 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.ml.MachineLearning; @@ -414,14 +413,14 @@ public class OpenJobAction extends Action listener) { + protected void nodeOperation(AllocatedPersistentTask task, Request request) { JobTask jobTask = (JobTask) task; jobTask.autodetectProcessManager = autodetectProcessManager; autodetectProcessManager.openJob(request.getJobId(), jobTask, request.isIgnoreDowntime(), e2 -> { if (e2 == null) { - listener.onResponse(new TransportResponse.Empty()); + task.markAsCompleted(); } else { - listener.onFailure(e2); + task.markAsFailed(e2); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 91233eddecf..3310a6d753f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -40,7 +40,6 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.ml.MlMetadata; @@ -437,16 +436,15 @@ public class StartDatafeedAction } @Override - protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request, - ActionListener listener) { + protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; datafeedTask.datafeedManager = datafeedManager; datafeedManager.run(datafeedTask, (error) -> { if (error != null) { - listener.onFailure(error); + datafeedTask.markAsFailed(error); } else { - listener.onResponse(TransportResponse.Empty.INSTANCE); + datafeedTask.markAsCompleted(); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java index 54127c20c27..514eafb80d7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java @@ -5,11 +5,17 @@ */ package org.elasticsearch.xpack.persistent; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +31,8 @@ public class AllocatedPersistentTask extends CancellableTask { private Exception failure; private PersistentTasksService persistentTasksService; + private Logger logger; + private TaskManager taskManager; public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) { @@ -52,9 +60,9 @@ public class AllocatedPersistentTask extends CancellableTask { } /** - * Updates the persistent state for the corresponding persistent task. - * - * This doesn't affect the status of this allocated task. + * Updates the persistent state for the corresponding persistent task. + *

+ * This doesn't affect the status of this allocated task. */ public void updatePersistentStatus(Task.Status status, ActionListener> listener) { persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener); @@ -64,26 +72,21 @@ public class AllocatedPersistentTask extends CancellableTask { return persistentTaskId; } - void init(PersistentTasksService persistentTasksService, long persistentTaskId, long allocationId) { + void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, long persistentTaskId, long + allocationId) { this.persistentTasksService = persistentTasksService; + this.logger = logger; + this.taskManager = taskManager; this.persistentTaskId = persistentTaskId; this.allocationId = allocationId; } - + public Exception getFailure() { return failure; } - State markAsCompleted(Exception failure) { - State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED); - if (prevState == State.STARTED || prevState == State.CANCELLED) { - this.failure = failure; - } - return prevState; - } - boolean markAsCancelled() { - return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.CANCELLED); + return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL); } public State getState() { @@ -96,7 +99,53 @@ public class AllocatedPersistentTask extends CancellableTask { public enum State { STARTED, // the task is currently running - CANCELLED, // the task is cancelled + PENDING_CANCEL, // the task is cancelled on master, cancelling it locally COMPLETED // the task is done running and trying to notify caller } + + public void markAsCompleted() { + completeAndNotifyIfNeeded(null); + } + + public void markAsFailed(Exception e) { + if (CancelTasksRequest.DEFAULT_REASON.equals(getReasonCancelled())) { + completeAndNotifyIfNeeded(null); + } else { + completeAndNotifyIfNeeded(e); + } + + } + + private void completeAndNotifyIfNeeded(@Nullable Exception failure) { + State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED); + if (prevState == State.COMPLETED) { + logger.warn("attempt to complete task {} in the {} state", getPersistentTaskId(), prevState); + } else { + if (failure != null) { + logger.warn((Supplier) () -> new ParameterizedMessage( + "task {} failed with an exception", getPersistentTaskId()), failure); + } + try { + this.failure = failure; + if (prevState == State.STARTED) { + logger.trace("sending notification for completed task {}", getPersistentTaskId()); + persistentTasksService.sendCompletionNotification(getPersistentTaskId(), failure, new + ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + logger.trace("notification for task {} was successful", getId()); + } + + @Override + public void onFailure(Exception e) { + logger.warn((Supplier) () -> + new ParameterizedMessage("notification for task {} failed", getPersistentTaskId()), e); + } + }); + } + } finally { + taskManager.unregister(this); + } + } + } } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java index 9406bd3f570..76c5d240209 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTasksExecutor.java @@ -5,10 +5,8 @@ */ package org.elasticsearch.xpack.persistent; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse.Empty; /** * This component is responsible for execution of persistent tasks. @@ -24,21 +22,20 @@ public class NodePersistentTasksExecutor { public void executeTask(Request request, AllocatedPersistentTask task, - PersistentTasksExecutor action, - ActionListener listener) { + PersistentTasksExecutor action) { threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - listener.onFailure(e); + task.markAsFailed(e); } @SuppressWarnings("unchecked") @Override protected void doRun() throws Exception { try { - action.nodeOperation(task, request, listener); + action.nodeOperation(task, request); } catch (Exception ex) { - listener.onFailure(ex); + task.markAsFailed(ex); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java index 12586bd4c91..2fc8ecfe9f8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutor.java @@ -85,11 +85,10 @@ public abstract class PersistentTasksExecutor - * If nodeOperation throws an exception or triggers listener.onFailure() method, the task will be restarted, - * possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully - * completed and will be removed from the cluster state and not restarted. + * NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to + * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener listener); + protected abstract void nodeOperation(AllocatedPersistentTask task, Request request); public String getExecutor() { return executor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java index b14395d75b0..ea238875d0d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeService.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.persistent; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; @@ -19,9 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; @@ -67,7 +64,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu // STARTED COMPLETED Noop - waiting for notification ack // NULL NULL Noop - nothing to do - // NULL STARTED Remove locally, Mark as CANCELLED, Cancel + // NULL STARTED Remove locally, Mark as PENDING_CANCEL, Cancel // NULL COMPLETED Remove locally // Master states: @@ -77,10 +74,10 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu // Local state: // NULL - we don't have task registered locally in runningTasks // STARTED - registered in TaskManager, requires master notification when finishes - // CANCELLED - registered in TaskManager, doesn't require master notification when finishes + // PENDING_CANCEL - registered in TaskManager, doesn't require master notification when finishes // COMPLETED - not registered in TaskManager, notified, waiting for master to remove it from CS so we can remove locally - // When task finishes if it is marked as STARTED or CANCELLED it is marked as COMPLETED and unregistered, + // When task finishes if it is marked as STARTED or PENDING_CANCEL it is marked as COMPLETED and unregistered, // If the task was STARTED, the master notification is also triggered (this is handled by unregisterTask() method, which is // triggered by PersistentTaskListener @@ -108,7 +105,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu AllocatedPersistentTask task = runningTasks.get(id); if (task.getState() == AllocatedPersistentTask.State.COMPLETED) { // Result was sent to the caller and the caller acknowledged acceptance of the result - finishTask(id); + runningTasks.remove(id); } else { // task is running locally, but master doesn't know about it - that means that the persistent task was removed // cancel the task without notifying master @@ -127,14 +124,13 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu taskInProgress.getRequest()); boolean processed = false; try { - task.init(persistentTasksService, taskInProgress.getId(), taskInProgress.getAllocationId()); - PersistentTaskListener listener = new PersistentTaskListener(task); + task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); try { runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task); - nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action, listener); + nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action); } catch (Exception e) { // Submit task failure - listener.onFailure(e); + task.markAsFailed(e); } processed = true; } finally { @@ -145,16 +141,6 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } } - /** - * Unregisters the locally running task. No notification to master will be send upon cancellation. - */ - private void finishTask(PersistentTaskId persistentTaskId) { - AllocatedPersistentTask task = runningTasks.remove(persistentTaskId); - if (task != null) { - taskManager.unregister(task); - } - } - /** * Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon * cancellation. @@ -181,65 +167,6 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } } - private void unregisterTask(AllocatedPersistentTask task, Exception e) { - AllocatedPersistentTask.State prevState = task.markAsCompleted(e); - if (prevState == AllocatedPersistentTask.State.CANCELLED) { - // The task was cancelled by master - no need to send notifications - taskManager.unregister(task); - } else if (prevState == AllocatedPersistentTask.State.STARTED) { - // The task finished locally, but master doesn't know about it - we need notify the master before we can unregister it - logger.trace("sending notification for completed task {}", task.getPersistentTaskId()); - persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), e, new ActionListener>() { - @Override - public void onResponse(PersistentTask persistentTask) { - logger.trace("notification for task {} was successful", task.getId()); - taskManager.unregister(task); - } - - @Override - public void onFailure(Exception e) { - logger.warn((Supplier) () -> - new ParameterizedMessage("notification for task {} failed", task.getPersistentTaskId()), e); - taskManager.unregister(task); - } - }); - } else { - logger.warn("attempt to complete task {} in the {} state", task.getPersistentTaskId(), prevState); - } - } - - private class PersistentTaskListener implements ActionListener { - private final AllocatedPersistentTask task; - - PersistentTaskListener(final AllocatedPersistentTask task) { - this.task = task; - } - - @Override - public void onResponse(Empty response) { - unregisterTask(task, null); - } - - @Override - public void onFailure(Exception e) { - if (task.isCancelled()) { - // The task was explicitly cancelled - no need to restart it, just log the exception if it's not TaskCancelledException - if (e instanceof TaskCancelledException == false) { - logger.warn((Supplier) () -> new ParameterizedMessage( - "cancelled task {} failed with an exception, cancellation reason [{}]", - task.getPersistentTaskId(), task.getReasonCancelled()), e); - } - if (CancelTasksRequest.DEFAULT_REASON.equals(task.getReasonCancelled())) { - unregisterTask(task, null); - } else { - unregisterTask(task, e); - } - } else { - unregisterTask(task, e); - } - } - } - private static class PersistentTaskId { private final long id; private final long allocationId; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index d73e8742575..4081cd750db 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; @@ -117,8 +116,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { assertThat(executor.size(), equalTo(2)); // Finish both tasks - executor.get(0).listener.onFailure(new RuntimeException()); - executor.get(1).listener.onResponse(Empty.INSTANCE); + executor.get(0).task.markAsFailed(new RuntimeException()); + executor.get(1).task.markAsCompleted(); long failedTaskId = executor.get(0).task.getParentTaskId().getId(); long finishedTaskId = executor.get(1).task.getParentTaskId().getId(); executor.clear(); @@ -203,7 +202,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Make sure it returns correct status assertThat(taskManager.getTasks().size(), equalTo(1)); - assertThat(taskManager.getTasks().values().iterator().next().getStatus().toString(), equalTo("{\"state\":\"CANCELLED\"}")); + assertThat(taskManager.getTasks().values().iterator().next().getStatus().toString(), equalTo("{\"state\":\"PENDING_CANCEL\"}")); // That should trigger cancellation request @@ -213,9 +212,9 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // finish or fail task if (randomBoolean()) { - executor.get(0).listener.onResponse(Empty.INSTANCE); + executor.get(0).task.markAsCompleted(); } else { - executor.get(0).listener.onFailure(new IOException("test")); + executor.get(0).task.markAsFailed(new IOException("test")); } // Check the the task is now removed from task manager @@ -251,14 +250,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { private final PersistentTaskRequest request; private final AllocatedPersistentTask task; private final PersistentTasksExecutor holder; - private final ActionListener listener; - Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor holder, - ActionListener listener) { + Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor holder) { this.request = request; this.task = task; this.holder = holder; - this.listener = listener; } } @@ -271,9 +267,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { @Override public void executeTask(Request request, AllocatedPersistentTask task, - PersistentTasksExecutor action, - ActionListener listener) { - executions.add(new Execution(request, task, action, listener)); + PersistentTasksExecutor action) { + executions.add(new Execution(request, task, action)); } public Execution get(int i) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java index af58c1defee..10783fe973e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java @@ -45,7 +45,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; @@ -326,7 +325,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } @Override - protected void nodeOperation(AllocatedPersistentTask task, TestRequest request, ActionListener listener) { + protected void nodeOperation(AllocatedPersistentTask task, TestRequest request) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -341,10 +340,10 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { return; } if ("finish".equals(testTask.getOperation())) { - listener.onResponse(Empty.INSTANCE); + task.markAsCompleted(); return; } else if ("fail".equals(testTask.getOperation())) { - listener.onFailure(new RuntimeException("Simulating failure")); + task.markAsFailed(new RuntimeException("Simulating failure")); return; } else if ("update_status".equals(testTask.getOperation())) { testTask.setOperation(null); @@ -370,12 +369,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { // Cancellation make cause different ways for the task to finish if (randomBoolean()) { if (randomBoolean()) { - listener.onFailure(new TaskCancelledException(testTask.getReasonCancelled())); + task.markAsFailed(new TaskCancelledException(testTask.getReasonCancelled())); } else { - listener.onResponse(Empty.INSTANCE); + task.markAsCompleted(); } } else { - listener.onFailure(new RuntimeException(testTask.getReasonCancelled())); + task.markAsFailed(new RuntimeException(testTask.getReasonCancelled())); } return; } else { @@ -383,7 +382,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { } } } catch (InterruptedException e) { - listener.onFailure(e); + task.markAsFailed(e); } } }