Persistent Tasks: remove listener from PersistentTasksExecutor#nodeOperation (elastic/x-pack-elasticsearch#1032)
Instead of having a separate listener for indicating that the current task is finished, this commit is switching to use allocated object itself. Original commit: elastic/x-pack-elasticsearch@7ad5362121
This commit is contained in:
parent
018a3d197d
commit
49223a8782
|
@ -38,7 +38,6 @@ import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponse;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.xpack.XPackPlugin;
|
import org.elasticsearch.xpack.XPackPlugin;
|
||||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||||
|
@ -414,14 +413,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
|
protected void nodeOperation(AllocatedPersistentTask task, Request request) {
|
||||||
JobTask jobTask = (JobTask) task;
|
JobTask jobTask = (JobTask) task;
|
||||||
jobTask.autodetectProcessManager = autodetectProcessManager;
|
jobTask.autodetectProcessManager = autodetectProcessManager;
|
||||||
autodetectProcessManager.openJob(request.getJobId(), jobTask, request.isIgnoreDowntime(), e2 -> {
|
autodetectProcessManager.openJob(request.getJobId(), jobTask, request.isIgnoreDowntime(), e2 -> {
|
||||||
if (e2 == null) {
|
if (e2 == null) {
|
||||||
listener.onResponse(new TransportResponse.Empty());
|
task.markAsCompleted();
|
||||||
} else {
|
} else {
|
||||||
listener.onFailure(e2);
|
task.markAsFailed(e2);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.license.XPackLicenseState;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponse;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.xpack.XPackPlugin;
|
import org.elasticsearch.xpack.XPackPlugin;
|
||||||
import org.elasticsearch.xpack.ml.MlMetadata;
|
import org.elasticsearch.xpack.ml.MlMetadata;
|
||||||
|
@ -437,16 +436,15 @@ public class StartDatafeedAction
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request,
|
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request) {
|
||||||
ActionListener<TransportResponse.Empty> listener) {
|
|
||||||
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
|
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
|
||||||
datafeedTask.datafeedManager = datafeedManager;
|
datafeedTask.datafeedManager = datafeedManager;
|
||||||
datafeedManager.run(datafeedTask,
|
datafeedManager.run(datafeedTask,
|
||||||
(error) -> {
|
(error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
listener.onFailure(error);
|
datafeedTask.markAsFailed(error);
|
||||||
} else {
|
} else {
|
||||||
listener.onResponse(TransportResponse.Empty.INSTANCE);
|
datafeedTask.markAsCompleted();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,11 +5,17 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.persistent;
|
package org.elasticsearch.xpack.persistent;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.tasks.CancellableTask;
|
import org.elasticsearch.tasks.CancellableTask;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
import org.elasticsearch.tasks.TaskCancelledException;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
|
import org.elasticsearch.tasks.TaskManager;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@ -25,6 +31,8 @@ public class AllocatedPersistentTask extends CancellableTask {
|
||||||
private Exception failure;
|
private Exception failure;
|
||||||
|
|
||||||
private PersistentTasksService persistentTasksService;
|
private PersistentTasksService persistentTasksService;
|
||||||
|
private Logger logger;
|
||||||
|
private TaskManager taskManager;
|
||||||
|
|
||||||
|
|
||||||
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||||
|
@ -53,7 +61,7 @@ public class AllocatedPersistentTask extends CancellableTask {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the persistent state for the corresponding persistent task.
|
* Updates the persistent state for the corresponding persistent task.
|
||||||
*
|
* <p>
|
||||||
* This doesn't affect the status of this allocated task.
|
* This doesn't affect the status of this allocated task.
|
||||||
*/
|
*/
|
||||||
public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||||
|
@ -64,8 +72,11 @@ public class AllocatedPersistentTask extends CancellableTask {
|
||||||
return persistentTaskId;
|
return persistentTaskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void init(PersistentTasksService persistentTasksService, long persistentTaskId, long allocationId) {
|
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, long persistentTaskId, long
|
||||||
|
allocationId) {
|
||||||
this.persistentTasksService = persistentTasksService;
|
this.persistentTasksService = persistentTasksService;
|
||||||
|
this.logger = logger;
|
||||||
|
this.taskManager = taskManager;
|
||||||
this.persistentTaskId = persistentTaskId;
|
this.persistentTaskId = persistentTaskId;
|
||||||
this.allocationId = allocationId;
|
this.allocationId = allocationId;
|
||||||
}
|
}
|
||||||
|
@ -74,16 +85,8 @@ public class AllocatedPersistentTask extends CancellableTask {
|
||||||
return failure;
|
return failure;
|
||||||
}
|
}
|
||||||
|
|
||||||
State markAsCompleted(Exception failure) {
|
|
||||||
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
|
|
||||||
if (prevState == State.STARTED || prevState == State.CANCELLED) {
|
|
||||||
this.failure = failure;
|
|
||||||
}
|
|
||||||
return prevState;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean markAsCancelled() {
|
boolean markAsCancelled() {
|
||||||
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.CANCELLED);
|
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
|
||||||
}
|
}
|
||||||
|
|
||||||
public State getState() {
|
public State getState() {
|
||||||
|
@ -96,7 +99,53 @@ public class AllocatedPersistentTask extends CancellableTask {
|
||||||
|
|
||||||
public enum State {
|
public enum State {
|
||||||
STARTED, // the task is currently running
|
STARTED, // the task is currently running
|
||||||
CANCELLED, // the task is cancelled
|
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
|
||||||
COMPLETED // the task is done running and trying to notify caller
|
COMPLETED // the task is done running and trying to notify caller
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void markAsCompleted() {
|
||||||
|
completeAndNotifyIfNeeded(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void markAsFailed(Exception e) {
|
||||||
|
if (CancelTasksRequest.DEFAULT_REASON.equals(getReasonCancelled())) {
|
||||||
|
completeAndNotifyIfNeeded(null);
|
||||||
|
} else {
|
||||||
|
completeAndNotifyIfNeeded(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
|
||||||
|
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
|
||||||
|
if (prevState == State.COMPLETED) {
|
||||||
|
logger.warn("attempt to complete task {} in the {} state", getPersistentTaskId(), prevState);
|
||||||
|
} else {
|
||||||
|
if (failure != null) {
|
||||||
|
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
|
||||||
|
"task {} failed with an exception", getPersistentTaskId()), failure);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
this.failure = failure;
|
||||||
|
if (prevState == State.STARTED) {
|
||||||
|
logger.trace("sending notification for completed task {}", getPersistentTaskId());
|
||||||
|
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), failure, new
|
||||||
|
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
|
||||||
|
logger.trace("notification for task {} was successful", getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
logger.warn((Supplier<?>) () ->
|
||||||
|
new ParameterizedMessage("notification for task {} failed", getPersistentTaskId()), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
taskManager.unregister(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -5,10 +5,8 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.persistent;
|
package org.elasticsearch.xpack.persistent;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionListener;
|
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This component is responsible for execution of persistent tasks.
|
* This component is responsible for execution of persistent tasks.
|
||||||
|
@ -24,21 +22,20 @@ public class NodePersistentTasksExecutor {
|
||||||
|
|
||||||
public <Request extends PersistentTaskRequest> void executeTask(Request request,
|
public <Request extends PersistentTaskRequest> void executeTask(Request request,
|
||||||
AllocatedPersistentTask task,
|
AllocatedPersistentTask task,
|
||||||
PersistentTasksExecutor<Request> action,
|
PersistentTasksExecutor<Request> action) {
|
||||||
ActionListener<Empty> listener) {
|
|
||||||
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
|
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
listener.onFailure(e);
|
task.markAsFailed(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
protected void doRun() throws Exception {
|
protected void doRun() throws Exception {
|
||||||
try {
|
try {
|
||||||
action.nodeOperation(task, request, listener);
|
action.nodeOperation(task, request);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
listener.onFailure(ex);
|
task.markAsFailed(ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,11 +85,10 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
|
||||||
/**
|
/**
|
||||||
* This operation will be executed on the executor node.
|
* This operation will be executed on the executor node.
|
||||||
* <p>
|
* <p>
|
||||||
* If nodeOperation throws an exception or triggers listener.onFailure() method, the task will be restarted,
|
* NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
|
||||||
* possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully
|
* indicate that the persistent task has finished.
|
||||||
* completed and will be removed from the cluster state and not restarted.
|
|
||||||
*/
|
*/
|
||||||
protected abstract void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener<Empty> listener);
|
protected abstract void nodeOperation(AllocatedPersistentTask task, Request request);
|
||||||
|
|
||||||
public String getExecutor() {
|
public String getExecutor() {
|
||||||
return executor;
|
return executor;
|
||||||
|
|
|
@ -8,7 +8,6 @@ package org.elasticsearch.xpack.persistent;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterStateListener;
|
import org.elasticsearch.cluster.ClusterStateListener;
|
||||||
|
@ -19,9 +18,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskCancelledException;
|
|
||||||
import org.elasticsearch.tasks.TaskManager;
|
import org.elasticsearch.tasks.TaskManager;
|
||||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -67,7 +64,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
||||||
// STARTED COMPLETED Noop - waiting for notification ack
|
// STARTED COMPLETED Noop - waiting for notification ack
|
||||||
|
|
||||||
// NULL NULL Noop - nothing to do
|
// NULL NULL Noop - nothing to do
|
||||||
// NULL STARTED Remove locally, Mark as CANCELLED, Cancel
|
// NULL STARTED Remove locally, Mark as PENDING_CANCEL, Cancel
|
||||||
// NULL COMPLETED Remove locally
|
// NULL COMPLETED Remove locally
|
||||||
|
|
||||||
// Master states:
|
// Master states:
|
||||||
|
@ -77,10 +74,10 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
||||||
// Local state:
|
// Local state:
|
||||||
// NULL - we don't have task registered locally in runningTasks
|
// NULL - we don't have task registered locally in runningTasks
|
||||||
// STARTED - registered in TaskManager, requires master notification when finishes
|
// STARTED - registered in TaskManager, requires master notification when finishes
|
||||||
// CANCELLED - registered in TaskManager, doesn't require master notification when finishes
|
// PENDING_CANCEL - registered in TaskManager, doesn't require master notification when finishes
|
||||||
// COMPLETED - not registered in TaskManager, notified, waiting for master to remove it from CS so we can remove locally
|
// COMPLETED - not registered in TaskManager, notified, waiting for master to remove it from CS so we can remove locally
|
||||||
|
|
||||||
// When task finishes if it is marked as STARTED or CANCELLED it is marked as COMPLETED and unregistered,
|
// When task finishes if it is marked as STARTED or PENDING_CANCEL it is marked as COMPLETED and unregistered,
|
||||||
// If the task was STARTED, the master notification is also triggered (this is handled by unregisterTask() method, which is
|
// If the task was STARTED, the master notification is also triggered (this is handled by unregisterTask() method, which is
|
||||||
// triggered by PersistentTaskListener
|
// triggered by PersistentTaskListener
|
||||||
|
|
||||||
|
@ -108,7 +105,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
||||||
AllocatedPersistentTask task = runningTasks.get(id);
|
AllocatedPersistentTask task = runningTasks.get(id);
|
||||||
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
|
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
|
||||||
// Result was sent to the caller and the caller acknowledged acceptance of the result
|
// Result was sent to the caller and the caller acknowledged acceptance of the result
|
||||||
finishTask(id);
|
runningTasks.remove(id);
|
||||||
} else {
|
} else {
|
||||||
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
|
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
|
||||||
// cancel the task without notifying master
|
// cancel the task without notifying master
|
||||||
|
@ -127,14 +124,13 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
||||||
taskInProgress.getRequest());
|
taskInProgress.getRequest());
|
||||||
boolean processed = false;
|
boolean processed = false;
|
||||||
try {
|
try {
|
||||||
task.init(persistentTasksService, taskInProgress.getId(), taskInProgress.getAllocationId());
|
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
|
||||||
PersistentTaskListener listener = new PersistentTaskListener(task);
|
|
||||||
try {
|
try {
|
||||||
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task);
|
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task);
|
||||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action, listener);
|
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Submit task failure
|
// Submit task failure
|
||||||
listener.onFailure(e);
|
task.markAsFailed(e);
|
||||||
}
|
}
|
||||||
processed = true;
|
processed = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -145,16 +141,6 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Unregisters the locally running task. No notification to master will be send upon cancellation.
|
|
||||||
*/
|
|
||||||
private void finishTask(PersistentTaskId persistentTaskId) {
|
|
||||||
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
|
|
||||||
if (task != null) {
|
|
||||||
taskManager.unregister(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon
|
* Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon
|
||||||
* cancellation.
|
* cancellation.
|
||||||
|
@ -181,65 +167,6 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void unregisterTask(AllocatedPersistentTask task, Exception e) {
|
|
||||||
AllocatedPersistentTask.State prevState = task.markAsCompleted(e);
|
|
||||||
if (prevState == AllocatedPersistentTask.State.CANCELLED) {
|
|
||||||
// The task was cancelled by master - no need to send notifications
|
|
||||||
taskManager.unregister(task);
|
|
||||||
} else if (prevState == AllocatedPersistentTask.State.STARTED) {
|
|
||||||
// The task finished locally, but master doesn't know about it - we need notify the master before we can unregister it
|
|
||||||
logger.trace("sending notification for completed task {}", task.getPersistentTaskId());
|
|
||||||
persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), e, new ActionListener<PersistentTask<?>>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(PersistentTask<?> persistentTask) {
|
|
||||||
logger.trace("notification for task {} was successful", task.getId());
|
|
||||||
taskManager.unregister(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
logger.warn((Supplier<?>) () ->
|
|
||||||
new ParameterizedMessage("notification for task {} failed", task.getPersistentTaskId()), e);
|
|
||||||
taskManager.unregister(task);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
logger.warn("attempt to complete task {} in the {} state", task.getPersistentTaskId(), prevState);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class PersistentTaskListener implements ActionListener<Empty> {
|
|
||||||
private final AllocatedPersistentTask task;
|
|
||||||
|
|
||||||
PersistentTaskListener(final AllocatedPersistentTask task) {
|
|
||||||
this.task = task;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onResponse(Empty response) {
|
|
||||||
unregisterTask(task, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
if (task.isCancelled()) {
|
|
||||||
// The task was explicitly cancelled - no need to restart it, just log the exception if it's not TaskCancelledException
|
|
||||||
if (e instanceof TaskCancelledException == false) {
|
|
||||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
|
|
||||||
"cancelled task {} failed with an exception, cancellation reason [{}]",
|
|
||||||
task.getPersistentTaskId(), task.getReasonCancelled()), e);
|
|
||||||
}
|
|
||||||
if (CancelTasksRequest.DEFAULT_REASON.equals(task.getReasonCancelled())) {
|
|
||||||
unregisterTask(task, null);
|
|
||||||
} else {
|
|
||||||
unregisterTask(task, e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
unregisterTask(task, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class PersistentTaskId {
|
private static class PersistentTaskId {
|
||||||
private final long id;
|
private final long id;
|
||||||
private final long allocationId;
|
private final long allocationId;
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskManager;
|
import org.elasticsearch.tasks.TaskManager;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||||
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
|
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
|
||||||
|
@ -117,8 +116,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
assertThat(executor.size(), equalTo(2));
|
assertThat(executor.size(), equalTo(2));
|
||||||
|
|
||||||
// Finish both tasks
|
// Finish both tasks
|
||||||
executor.get(0).listener.onFailure(new RuntimeException());
|
executor.get(0).task.markAsFailed(new RuntimeException());
|
||||||
executor.get(1).listener.onResponse(Empty.INSTANCE);
|
executor.get(1).task.markAsCompleted();
|
||||||
long failedTaskId = executor.get(0).task.getParentTaskId().getId();
|
long failedTaskId = executor.get(0).task.getParentTaskId().getId();
|
||||||
long finishedTaskId = executor.get(1).task.getParentTaskId().getId();
|
long finishedTaskId = executor.get(1).task.getParentTaskId().getId();
|
||||||
executor.clear();
|
executor.clear();
|
||||||
|
@ -203,7 +202,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
// Make sure it returns correct status
|
// Make sure it returns correct status
|
||||||
assertThat(taskManager.getTasks().size(), equalTo(1));
|
assertThat(taskManager.getTasks().size(), equalTo(1));
|
||||||
assertThat(taskManager.getTasks().values().iterator().next().getStatus().toString(), equalTo("{\"state\":\"CANCELLED\"}"));
|
assertThat(taskManager.getTasks().values().iterator().next().getStatus().toString(), equalTo("{\"state\":\"PENDING_CANCEL\"}"));
|
||||||
|
|
||||||
|
|
||||||
// That should trigger cancellation request
|
// That should trigger cancellation request
|
||||||
|
@ -213,9 +212,9 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
// finish or fail task
|
// finish or fail task
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
executor.get(0).listener.onResponse(Empty.INSTANCE);
|
executor.get(0).task.markAsCompleted();
|
||||||
} else {
|
} else {
|
||||||
executor.get(0).listener.onFailure(new IOException("test"));
|
executor.get(0).task.markAsFailed(new IOException("test"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the the task is now removed from task manager
|
// Check the the task is now removed from task manager
|
||||||
|
@ -251,14 +250,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
private final PersistentTaskRequest request;
|
private final PersistentTaskRequest request;
|
||||||
private final AllocatedPersistentTask task;
|
private final AllocatedPersistentTask task;
|
||||||
private final PersistentTasksExecutor<?> holder;
|
private final PersistentTasksExecutor<?> holder;
|
||||||
private final ActionListener<Empty> listener;
|
|
||||||
|
|
||||||
Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder,
|
Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) {
|
||||||
ActionListener<Empty> listener) {
|
|
||||||
this.request = request;
|
this.request = request;
|
||||||
this.task = task;
|
this.task = task;
|
||||||
this.holder = holder;
|
this.holder = holder;
|
||||||
this.listener = listener;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,9 +267,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <Request extends PersistentTaskRequest> void executeTask(Request request, AllocatedPersistentTask task,
|
public <Request extends PersistentTaskRequest> void executeTask(Request request, AllocatedPersistentTask task,
|
||||||
PersistentTasksExecutor<Request> action,
|
PersistentTasksExecutor<Request> action) {
|
||||||
ActionListener<Empty> listener) {
|
executions.add(new Execution(request, task, action));
|
||||||
executions.add(new Execution(request, task, action, listener));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Execution get(int i) {
|
public Execution get(int i) {
|
||||||
|
|
|
@ -45,7 +45,6 @@ import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskCancelledException;
|
import org.elasticsearch.tasks.TaskCancelledException;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||||
|
@ -326,7 +325,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void nodeOperation(AllocatedPersistentTask task, TestRequest request, ActionListener<Empty> listener) {
|
protected void nodeOperation(AllocatedPersistentTask task, TestRequest request) {
|
||||||
logger.info("started node operation for the task {}", task);
|
logger.info("started node operation for the task {}", task);
|
||||||
try {
|
try {
|
||||||
TestTask testTask = (TestTask) task;
|
TestTask testTask = (TestTask) task;
|
||||||
|
@ -341,10 +340,10 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ("finish".equals(testTask.getOperation())) {
|
if ("finish".equals(testTask.getOperation())) {
|
||||||
listener.onResponse(Empty.INSTANCE);
|
task.markAsCompleted();
|
||||||
return;
|
return;
|
||||||
} else if ("fail".equals(testTask.getOperation())) {
|
} else if ("fail".equals(testTask.getOperation())) {
|
||||||
listener.onFailure(new RuntimeException("Simulating failure"));
|
task.markAsFailed(new RuntimeException("Simulating failure"));
|
||||||
return;
|
return;
|
||||||
} else if ("update_status".equals(testTask.getOperation())) {
|
} else if ("update_status".equals(testTask.getOperation())) {
|
||||||
testTask.setOperation(null);
|
testTask.setOperation(null);
|
||||||
|
@ -370,12 +369,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
// Cancellation make cause different ways for the task to finish
|
// Cancellation make cause different ways for the task to finish
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
listener.onFailure(new TaskCancelledException(testTask.getReasonCancelled()));
|
task.markAsFailed(new TaskCancelledException(testTask.getReasonCancelled()));
|
||||||
} else {
|
} else {
|
||||||
listener.onResponse(Empty.INSTANCE);
|
task.markAsCompleted();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
listener.onFailure(new RuntimeException(testTask.getReasonCancelled()));
|
task.markAsFailed(new RuntimeException(testTask.getReasonCancelled()));
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
@ -383,7 +382,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
listener.onFailure(e);
|
task.markAsFailed(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue