Persistent Tasks: Merge NodePersistentTask and RunningPersistentTask (elastic/x-pack-elasticsearch#842)
Refactors NodePersistentTask and RunningPersistentTask into a single AllocatedPersistentTask. Makes it possible to update Persistent Task Status via AllocatedPersistentTask. Original commit: elastic/x-pack-elasticsearch@8f59d7b819
This commit is contained in:
parent
9b4d399fc3
commit
9fc6ce83ee
|
@ -373,9 +373,9 @@
|
|||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]notification[/\\]slack[/\\]message[/\\]Field.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]notification[/\\]slack[/\\]message[/\\]SlackMessage.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]notification[/\\]slack[/\\]message[/\\]SlackMessageDefaults.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]persistent[/\\]AllocatedPersistentTask.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]persistent[/\\]CompletionPersistentTaskAction.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]persistent[/\\]CreatePersistentTaskAction.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]persistent[/\\]NodePersistentTask.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]persistent[/\\]NodePersistentTasksExecutor.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]persistent[/\\]PersistentTaskRequest.java" checks="LineLength" />
|
||||
<suppress files="plugin[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]xpack[/\\]persistent[/\\]PersistentTaskResponse.java" checks="LineLength" />
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage
|
|||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
|
||||
import org.elasticsearch.xpack.persistent.NodePersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
|
@ -251,7 +251,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
|
||||
}
|
||||
|
||||
public static class JobTask extends NodePersistentTask {
|
||||
public static class JobTask extends AllocatedPersistentTask {
|
||||
|
||||
private final String jobId;
|
||||
private volatile AutodetectProcessManager autodetectProcessManager;
|
||||
|
@ -380,7 +380,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(NodePersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener<TransportResponse.Empty> listener) {
|
||||
JobTask jobTask = (JobTask) task;
|
||||
jobTask.autodetectProcessManager = autodetectProcessManager;
|
||||
autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> {
|
||||
|
|
|
@ -56,7 +56,7 @@ import org.elasticsearch.xpack.ml.job.messages.Messages;
|
|||
import org.elasticsearch.xpack.ml.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver;
|
||||
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.persistent.NodePersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
|
@ -292,7 +292,7 @@ public class StartDatafeedAction
|
|||
}
|
||||
}
|
||||
|
||||
public static class DatafeedTask extends NodePersistentTask {
|
||||
public static class DatafeedTask extends AllocatedPersistentTask {
|
||||
|
||||
private final String datafeedId;
|
||||
private final long startTime;
|
||||
|
@ -421,9 +421,9 @@ public class StartDatafeedAction
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(NodePersistentTask nodePersistentTask, Request request,
|
||||
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, Request request,
|
||||
ActionListener<TransportResponse.Empty> listener) {
|
||||
DatafeedTask datafeedTask = (DatafeedTask) nodePersistentTask;
|
||||
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
|
||||
datafeedTask.datafeedJobRunner = datafeedJobRunner;
|
||||
datafeedJobRunner.run(datafeedTask,
|
||||
(error) -> {
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.persistent;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Represents a executor node operation that corresponds to a persistent task
|
||||
*/
|
||||
public class AllocatedPersistentTask extends CancellableTask {
|
||||
private long persistentTaskId;
|
||||
|
||||
private final AtomicReference<State> state;
|
||||
@Nullable
|
||||
private Exception failure;
|
||||
|
||||
private PersistentTasksService persistentTasksService;
|
||||
|
||||
|
||||
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
super(id, type, action, description, parentTask);
|
||||
this.state = new AtomicReference<>(State.STARTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// In case of persistent tasks we always need to return: `false`
|
||||
// because in case of persistent task the parent task isn't a task in the task manager, but in cluster state.
|
||||
// This instructs the task manager not to try to kill this persistent task when the task manager cannot find
|
||||
// a fake parent node id "cluster" in the cluster state
|
||||
@Override
|
||||
public final boolean cancelOnParentLeaving() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status getStatus() {
|
||||
return new PersistentTasksNodeService.Status(state.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the persistent state for the corresponding persistent task.
|
||||
*
|
||||
* This doesn't affect the status of this allocated task.
|
||||
*/
|
||||
public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) {
|
||||
persistentTasksService.updateStatus(persistentTaskId, status, listener);
|
||||
}
|
||||
|
||||
public long getPersistentTaskId() {
|
||||
return persistentTaskId;
|
||||
}
|
||||
|
||||
void init(PersistentTasksService persistentTasksService, long persistentTaskId) {
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
this.persistentTaskId = persistentTaskId;
|
||||
}
|
||||
|
||||
public Exception getFailure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
boolean startNotification(Exception failure) {
|
||||
boolean result = state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.FAILED);
|
||||
if (result) {
|
||||
this.failure = failure;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
boolean notificationFailed() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.FAILED, AllocatedPersistentTask.State.FAILED_NOTIFICATION);
|
||||
}
|
||||
|
||||
boolean restartCompletionNotification() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.FAILED_NOTIFICATION, AllocatedPersistentTask.State.FAILED);
|
||||
}
|
||||
|
||||
boolean markAsNotified() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.FAILED, AllocatedPersistentTask.State.NOTIFIED);
|
||||
}
|
||||
|
||||
boolean markAsCancelled() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.CANCELLED);
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
public enum State {
|
||||
STARTED, // the task is currently running
|
||||
CANCELLED, // the task is cancelled
|
||||
FAILED, // the task is done running and trying to notify caller
|
||||
FAILED_NOTIFICATION, // the caller notification failed
|
||||
NOTIFIED // the caller was notified, the task can be removed
|
||||
}
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.persistent;
|
||||
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
/**
|
||||
* Represents a executor node operation that corresponds to a persistent task
|
||||
*/
|
||||
public class NodePersistentTask extends CancellableTask {
|
||||
private Provider<Status> statusProvider;
|
||||
|
||||
private long persistentTaskId;
|
||||
|
||||
public NodePersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
super(id, type, action, description, parentTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// In case of persistent tasks we always need to return: `false`
|
||||
// because in case of persistent task the parent task isn't a task in the task manager, but in cluster state.
|
||||
// This instructs the task manager not to try to kill this persistent task when the task manager cannot find
|
||||
// a fake parent node id "cluster" in the cluster state
|
||||
@Override
|
||||
public final boolean cancelOnParentLeaving() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status getStatus() {
|
||||
Provider<Status> statusProvider = this.statusProvider;
|
||||
if (statusProvider != null) {
|
||||
return statusProvider.get();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setStatusProvider(Provider<Status> statusProvider) {
|
||||
assert this.statusProvider == null;
|
||||
this.statusProvider = statusProvider;
|
||||
}
|
||||
|
||||
public long getPersistentTaskId() {
|
||||
return persistentTaskId;
|
||||
}
|
||||
|
||||
public void setPersistentTaskId(long persistentTaskId) {
|
||||
this.persistentTaskId = persistentTaskId;
|
||||
}
|
||||
|
||||
}
|
|
@ -21,7 +21,7 @@ public class NodePersistentTasksExecutor {
|
|||
}
|
||||
|
||||
public <Request extends PersistentTaskRequest> void executeTask(Request request,
|
||||
NodePersistentTask task,
|
||||
AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Request> action,
|
||||
ActionListener<Empty> listener) {
|
||||
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
|
||||
|
|
|
@ -17,6 +17,6 @@ import org.elasticsearch.tasks.TaskId;
|
|||
public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent {
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new NodePersistentTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId);
|
||||
}
|
||||
}
|
|
@ -93,7 +93,7 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
|
|||
* The status can be used to store the current progress of the task or provide an insight for the
|
||||
* task allocator about the state of the currently running tasks.
|
||||
*/
|
||||
protected void updatePersistentTaskStatus(NodePersistentTask task, Task.Status status, ActionListener<Empty> listener) {
|
||||
protected void updatePersistentTaskStatus(AllocatedPersistentTask task, Task.Status status, ActionListener<Empty> listener) {
|
||||
persistentTasksService.updateStatus(task.getPersistentTaskId(), status,
|
||||
new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
|
@ -115,7 +115,7 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
|
|||
* possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully
|
||||
* completed and will be removed from the cluster state and not restarted.
|
||||
*/
|
||||
protected abstract void nodeOperation(NodePersistentTask task, Request request, ActionListener<Empty> listener);
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener<Empty> listener);
|
||||
|
||||
public String getExecutor() {
|
||||
return executor;
|
||||
|
|
|
@ -11,10 +11,8 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -34,7 +32,6 @@ import java.util.HashSet;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
|
@ -43,7 +40,7 @@ import static java.util.Objects.requireNonNull;
|
|||
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
|
||||
*/
|
||||
public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener {
|
||||
private final Map<PersistentTaskId, RunningPersistentTask> runningTasks = new HashMap<>();
|
||||
private final Map<PersistentTaskId, AllocatedPersistentTask> runningTasks = new HashMap<>();
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry;
|
||||
private final TaskManager taskManager;
|
||||
|
@ -77,14 +74,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
|
||||
if (localNodeId.equals(taskInProgress.getExecutorNode())) {
|
||||
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
|
||||
RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId);
|
||||
AllocatedPersistentTask persistentTask = runningTasks.get(persistentTaskId);
|
||||
if (persistentTask == null) {
|
||||
// New task - let's start it
|
||||
startTask(taskInProgress);
|
||||
} else {
|
||||
// The task is still running
|
||||
notVisitedTasks.remove(persistentTaskId);
|
||||
if (persistentTask.getState() == State.FAILED_NOTIFICATION) {
|
||||
if (persistentTask.getState() == AllocatedPersistentTask.State.FAILED_NOTIFICATION) {
|
||||
// We tried to notify the master about this task before but the notification failed and
|
||||
// the master doesn't seem to know about it - retry notification
|
||||
restartCompletionNotification(persistentTask);
|
||||
|
@ -95,14 +92,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
for (PersistentTaskId id : notVisitedTasks) {
|
||||
RunningPersistentTask task = runningTasks.get(id);
|
||||
if (task.getState() == State.NOTIFIED || task.getState() == State.FAILED) {
|
||||
AllocatedPersistentTask task = runningTasks.get(id);
|
||||
if (task.getState() == AllocatedPersistentTask.State.NOTIFIED || task.getState() == AllocatedPersistentTask.State.FAILED) {
|
||||
// Result was sent to the caller and the caller acknowledged acceptance of the result
|
||||
finishTask(id);
|
||||
} else if (task.getState() == State.FAILED_NOTIFICATION) {
|
||||
} else if (task.getState() == AllocatedPersistentTask.State.FAILED_NOTIFICATION) {
|
||||
// We tried to send result to master, but it failed and master doesn't know about this task
|
||||
// this shouldn't really happen, unless this node is severally out of sync with the master
|
||||
logger.warn("failed to notify master about task {}", task.getId());
|
||||
logger.warn("failed to notify master about task {}", task.getPersistentTaskId());
|
||||
finishTask(id);
|
||||
} else {
|
||||
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
|
||||
|
@ -117,16 +114,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
private <Request extends PersistentTaskRequest> void startTask(PersistentTask<Request> taskInProgress) {
|
||||
PersistentTasksExecutor<Request> action = persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName());
|
||||
NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
|
||||
AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
|
||||
taskInProgress.getRequest());
|
||||
boolean processed = false;
|
||||
try {
|
||||
RunningPersistentTask runningPersistentTask = new RunningPersistentTask(task, taskInProgress.getId());
|
||||
task.setStatusProvider(runningPersistentTask);
|
||||
task.setPersistentTaskId(taskInProgress.getId());
|
||||
PersistentTaskListener listener = new PersistentTaskListener(runningPersistentTask);
|
||||
task.init(persistentTasksService, taskInProgress.getId());
|
||||
PersistentTaskListener listener = new PersistentTaskListener(task);
|
||||
try {
|
||||
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), runningPersistentTask);
|
||||
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task);
|
||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action, listener);
|
||||
} catch (Exception e) {
|
||||
// Submit task failure
|
||||
|
@ -142,17 +137,17 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
private void finishTask(PersistentTaskId persistentTaskId) {
|
||||
RunningPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null && task.getTask() != null) {
|
||||
taskManager.unregister(task.getTask());
|
||||
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null) {
|
||||
taskManager.unregister(task);
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelTask(PersistentTaskId persistentTaskId) {
|
||||
RunningPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null && task.getTask() != null) {
|
||||
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null) {
|
||||
if (task.markAsCancelled()) {
|
||||
persistentTasksService.sendCancellation(task.getTask().getId(), new PersistentTaskOperationListener() {
|
||||
persistentTasksService.sendCancellation(task.getId(), new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
logger.trace("Persistent task with id {} was cancelled", taskId);
|
||||
|
@ -162,7 +157,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// There is really nothing we can do in case of failure here
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getId()), e);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -170,10 +165,10 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
|
||||
private void restartCompletionNotification(RunningPersistentTask task) {
|
||||
logger.trace("resending notification for task {}", task.getId());
|
||||
if (task.getState() == State.CANCELLED) {
|
||||
taskManager.unregister(task.getTask());
|
||||
private void restartCompletionNotification(AllocatedPersistentTask task) {
|
||||
logger.trace("resending notification for task {}", task.getPersistentTaskId());
|
||||
if (task.getState() == AllocatedPersistentTask.State.CANCELLED) {
|
||||
taskManager.unregister(task);
|
||||
} else {
|
||||
if (task.restartCompletionNotification()) {
|
||||
// Need to fork otherwise: java.lang.AssertionError: should not be called by a cluster state applier.
|
||||
|
@ -188,35 +183,35 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
persistentTasksService.sendCompletionNotification(task.getId(), task.getFailure(), listener);
|
||||
persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), task.getFailure(), listener);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
} else {
|
||||
logger.warn("attempt to resend notification for task {} in the {} state", task.getId(), task.getState());
|
||||
logger.warn("attempt to resend notification for task {} in the {} state", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void startCompletionNotification(RunningPersistentTask task, Exception e) {
|
||||
if (task.getState() == State.CANCELLED) {
|
||||
taskManager.unregister(task.getTask());
|
||||
private void startCompletionNotification(AllocatedPersistentTask task, Exception e) {
|
||||
if (task.getState() == AllocatedPersistentTask.State.CANCELLED) {
|
||||
taskManager.unregister(task);
|
||||
} else {
|
||||
logger.trace("sending notification for failed task {}", task.getId());
|
||||
logger.trace("sending notification for failed task {}", task.getPersistentTaskId());
|
||||
if (task.startNotification(e)) {
|
||||
persistentTasksService.sendCompletionNotification(task.getId(), e, new PublishedResponseListener(task));
|
||||
persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), e, new PublishedResponseListener(task));
|
||||
} else {
|
||||
logger.warn("attempt to send notification for task {} in the {} state", task.getId(), task.getState());
|
||||
logger.warn("attempt to send notification for task {} in the {} state", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class PersistentTaskListener implements ActionListener<Empty> {
|
||||
private final RunningPersistentTask task;
|
||||
private final AllocatedPersistentTask task;
|
||||
|
||||
PersistentTaskListener(final RunningPersistentTask task) {
|
||||
PersistentTaskListener(final AllocatedPersistentTask task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
|
@ -227,14 +222,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (task.getTask().isCancelled()) {
|
||||
if (task.isCancelled()) {
|
||||
// The task was explicitly cancelled - no need to restart it, just log the exception if it's not TaskCancelledException
|
||||
if (e instanceof TaskCancelledException == false) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
|
||||
"cancelled task {} failed with an exception, cancellation reason [{}]",
|
||||
task.getId(), task.getTask().getReasonCancelled()), e);
|
||||
task.getPersistentTaskId(), task.getReasonCancelled()), e);
|
||||
}
|
||||
if (CancelTasksRequest.DEFAULT_REASON.equals(task.getTask().getReasonCancelled())) {
|
||||
if (CancelTasksRequest.DEFAULT_REASON.equals(task.getReasonCancelled())) {
|
||||
startCompletionNotification(task, null);
|
||||
} else {
|
||||
startCompletionNotification(task, e);
|
||||
|
@ -246,39 +241,31 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
private class PublishedResponseListener implements PersistentTaskOperationListener {
|
||||
private final RunningPersistentTask task;
|
||||
private final AllocatedPersistentTask task;
|
||||
|
||||
PublishedResponseListener(final RunningPersistentTask task) {
|
||||
PublishedResponseListener(final AllocatedPersistentTask task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
logger.trace("notification for task {} was successful", task.getId());
|
||||
logger.trace("notification for task {} was successful", task.getPersistentTaskId());
|
||||
if (task.markAsNotified() == false) {
|
||||
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getId(), task.getState());
|
||||
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
taskManager.unregister(task.getTask());
|
||||
taskManager.unregister(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("notification for task {} failed - retrying", task.getId()), e);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("notification for task {} failed - retrying", task.getPersistentTaskId()), e);
|
||||
if (task.notificationFailed() == false) {
|
||||
logger.warn("attempt to mark restart notification for task {} in the {} state failed", task.getId(), task.getState());
|
||||
logger.warn("attempt to mark restart notification for task {} in the {} state failed", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public enum State {
|
||||
STARTED, // the task is currently running
|
||||
CANCELLED, // the task is cancelled
|
||||
FAILED, // the task is done running and trying to notify caller
|
||||
FAILED_NOTIFICATION, // the caller notification failed
|
||||
NOTIFIED // the caller was notified, the task can be removed
|
||||
}
|
||||
|
||||
private static class PersistentTaskId {
|
||||
private final long id;
|
||||
private final long allocationId;
|
||||
|
@ -304,80 +291,17 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
}
|
||||
|
||||
private static class RunningPersistentTask implements Provider<Task.Status> {
|
||||
private final NodePersistentTask task;
|
||||
private final long id;
|
||||
private final AtomicReference<State> state;
|
||||
@Nullable
|
||||
private Exception failure;
|
||||
|
||||
RunningPersistentTask(NodePersistentTask task, long id) {
|
||||
this(task, id, State.STARTED);
|
||||
}
|
||||
|
||||
RunningPersistentTask(NodePersistentTask task, long id, State state) {
|
||||
this.task = task;
|
||||
this.id = id;
|
||||
this.state = new AtomicReference<>(state);
|
||||
}
|
||||
|
||||
public NodePersistentTask getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
public Exception getFailure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
public boolean startNotification(Exception failure) {
|
||||
boolean result = state.compareAndSet(State.STARTED, State.FAILED);
|
||||
if (result) {
|
||||
this.failure = failure;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean notificationFailed() {
|
||||
return state.compareAndSet(State.FAILED, State.FAILED_NOTIFICATION);
|
||||
}
|
||||
|
||||
public boolean restartCompletionNotification() {
|
||||
return state.compareAndSet(State.FAILED_NOTIFICATION, State.FAILED);
|
||||
}
|
||||
|
||||
public boolean markAsNotified() {
|
||||
return state.compareAndSet(State.FAILED, State.NOTIFIED);
|
||||
}
|
||||
|
||||
public boolean markAsCancelled() {
|
||||
return state.compareAndSet(State.STARTED, State.CANCELLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task.Status get() {
|
||||
return new Status(state.get());
|
||||
}
|
||||
}
|
||||
|
||||
public static class Status implements Task.Status {
|
||||
public static final String NAME = "persistent_executor";
|
||||
|
||||
private final State state;
|
||||
private final AllocatedPersistentTask.State state;
|
||||
|
||||
public Status(State state) {
|
||||
public Status(AllocatedPersistentTask.State state) {
|
||||
this.state = requireNonNull(state, "State cannot be null");
|
||||
}
|
||||
|
||||
public Status(StreamInput in) throws IOException {
|
||||
state = State.valueOf(in.readString());
|
||||
state = AllocatedPersistentTask.State.valueOf(in.readString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -403,7 +327,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
public AllocatedPersistentTask.State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ package org.elasticsearch.xpack.persistent;
|
|||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService.State;
|
||||
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService.Status;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -16,7 +15,7 @@ public class PersistentTasksNodeServiceStatusTests extends AbstractWireSerializi
|
|||
|
||||
@Override
|
||||
protected Status createTestInstance() {
|
||||
return new Status(randomFrom(State.values()));
|
||||
return new Status(randomFrom(AllocatedPersistentTask.State.values()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -324,11 +324,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
|
||||
private class Execution {
|
||||
private final PersistentTaskRequest request;
|
||||
private final NodePersistentTask task;
|
||||
private final AllocatedPersistentTask task;
|
||||
private final PersistentTasksExecutor<?> holder;
|
||||
private final ActionListener<Empty> listener;
|
||||
|
||||
Execution(PersistentTaskRequest request, NodePersistentTask task, PersistentTasksExecutor<?> holder,
|
||||
Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder,
|
||||
ActionListener<Empty> listener) {
|
||||
this.request = request;
|
||||
this.task = task;
|
||||
|
@ -345,7 +345,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <Request extends PersistentTaskRequest> void executeTask(Request request, NodePersistentTask task,
|
||||
public <Request extends PersistentTaskRequest> void executeTask(Request request, AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Request> action,
|
||||
ActionListener<Empty> listener) {
|
||||
executions.add(new Execution(request, task, action, listener));
|
||||
|
|
|
@ -326,7 +326,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(NodePersistentTask task, TestRequest request, ActionListener<Empty> listener) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, TestRequest request, ActionListener<Empty> listener) {
|
||||
logger.info("started node operation for the task {}", task);
|
||||
try {
|
||||
TestTask testTask = (TestTask) task;
|
||||
|
@ -409,7 +409,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
}
|
||||
|
||||
|
||||
public static class TestTask extends NodePersistentTask {
|
||||
public static class TestTask extends AllocatedPersistentTask {
|
||||
private volatile String operation;
|
||||
|
||||
public TestTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
|
|
Loading…
Reference in New Issue