Persistent Tasks: Merge NodePersistentTask and RunningPersistentTask (#842)
Refactors NodePersistentTask and RunningPersistentTask into a single AllocatedPersistentTask. Makes it possible to update Persistent Task Status via AllocatedPersistentTask.
This commit is contained in:
parent
19f39fd392
commit
37fad04879
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Represents a executor node operation that corresponds to a persistent task
|
||||
*/
|
||||
public class AllocatedPersistentTask extends CancellableTask {
|
||||
private long persistentTaskId;
|
||||
|
||||
private final AtomicReference<State> state;
|
||||
@Nullable
|
||||
private Exception failure;
|
||||
|
||||
private PersistentTasksService persistentTasksService;
|
||||
|
||||
|
||||
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
super(id, type, action, description, parentTask);
|
||||
this.state = new AtomicReference<>(State.STARTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// In case of persistent tasks we always need to return: `false`
|
||||
// because in case of persistent task the parent task isn't a task in the task manager, but in cluster state.
|
||||
// This instructs the task manager not to try to kill this persistent task when the task manager cannot find
|
||||
// a fake parent node id "cluster" in the cluster state
|
||||
@Override
|
||||
public final boolean cancelOnParentLeaving() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status getStatus() {
|
||||
return new PersistentTasksNodeService.Status(state.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the persistent state for the corresponding persistent task.
|
||||
*
|
||||
* This doesn't affect the status of this allocated task.
|
||||
*/
|
||||
public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) {
|
||||
persistentTasksService.updateStatus(persistentTaskId, status, listener);
|
||||
}
|
||||
|
||||
public long getPersistentTaskId() {
|
||||
return persistentTaskId;
|
||||
}
|
||||
|
||||
void init(PersistentTasksService persistentTasksService, long persistentTaskId) {
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
this.persistentTaskId = persistentTaskId;
|
||||
}
|
||||
|
||||
public Exception getFailure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
boolean startNotification(Exception failure) {
|
||||
boolean result = state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.FAILED);
|
||||
if (result) {
|
||||
this.failure = failure;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
boolean notificationFailed() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.FAILED, AllocatedPersistentTask.State.FAILED_NOTIFICATION);
|
||||
}
|
||||
|
||||
boolean restartCompletionNotification() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.FAILED_NOTIFICATION, AllocatedPersistentTask.State.FAILED);
|
||||
}
|
||||
|
||||
boolean markAsNotified() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.FAILED, AllocatedPersistentTask.State.NOTIFIED);
|
||||
}
|
||||
|
||||
boolean markAsCancelled() {
|
||||
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.CANCELLED);
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
public enum State {
|
||||
STARTED, // the task is currently running
|
||||
CANCELLED, // the task is cancelled
|
||||
FAILED, // the task is done running and trying to notify caller
|
||||
FAILED_NOTIFICATION, // the caller notification failed
|
||||
NOTIFIED // the caller was notified, the task can be removed
|
||||
}
|
||||
}
|
|
@ -1,74 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
/**
|
||||
* Represents a executor node operation that corresponds to a persistent task
|
||||
*/
|
||||
public class NodePersistentTask extends CancellableTask {
|
||||
private Provider<Status> statusProvider;
|
||||
|
||||
private long persistentTaskId;
|
||||
|
||||
public NodePersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
super(id, type, action, description, parentTask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCancelChildrenOnCancellation() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// In case of persistent tasks we always need to return: `false`
|
||||
// because in case of persistent task the parent task isn't a task in the task manager, but in cluster state.
|
||||
// This instructs the task manager not to try to kill this persistent task when the task manager cannot find
|
||||
// a fake parent node id "cluster" in the cluster state
|
||||
@Override
|
||||
public final boolean cancelOnParentLeaving() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status getStatus() {
|
||||
Provider<Status> statusProvider = this.statusProvider;
|
||||
if (statusProvider != null) {
|
||||
return statusProvider.get();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setStatusProvider(Provider<Status> statusProvider) {
|
||||
assert this.statusProvider == null;
|
||||
this.statusProvider = statusProvider;
|
||||
}
|
||||
|
||||
public long getPersistentTaskId() {
|
||||
return persistentTaskId;
|
||||
}
|
||||
|
||||
public void setPersistentTaskId(long persistentTaskId) {
|
||||
this.persistentTaskId = persistentTaskId;
|
||||
}
|
||||
|
||||
}
|
|
@ -34,7 +34,7 @@ public class NodePersistentTasksExecutor {
|
|||
}
|
||||
|
||||
public <Request extends PersistentTaskRequest> void executeTask(Request request,
|
||||
NodePersistentTask task,
|
||||
AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Request> action,
|
||||
ActionListener<Empty> listener) {
|
||||
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
|
||||
|
|
|
@ -30,6 +30,6 @@ import org.elasticsearch.tasks.TaskId;
|
|||
public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent {
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new NodePersistentTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
|
|||
* The status can be used to store the current progress of the task or provide an insight for the
|
||||
* task allocator about the state of the currently running tasks.
|
||||
*/
|
||||
protected void updatePersistentTaskStatus(NodePersistentTask task, Task.Status status, ActionListener<Empty> listener) {
|
||||
protected void updatePersistentTaskStatus(AllocatedPersistentTask task, Task.Status status, ActionListener<Empty> listener) {
|
||||
persistentTasksService.updateStatus(task.getPersistentTaskId(), status,
|
||||
new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
|
@ -129,7 +129,7 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
|
|||
* possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully
|
||||
* completed and will be removed from the cluster state and not restarted.
|
||||
*/
|
||||
protected abstract void nodeOperation(NodePersistentTask task, Request request, ActionListener<Empty> listener);
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, Request request, ActionListener<Empty> listener);
|
||||
|
||||
public String getExecutor() {
|
||||
return executor;
|
||||
|
|
|
@ -24,10 +24,8 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -47,7 +45,6 @@ import java.util.HashSet;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
|
@ -56,7 +53,7 @@ import static java.util.Objects.requireNonNull;
|
|||
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
|
||||
*/
|
||||
public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener {
|
||||
private final Map<PersistentTaskId, RunningPersistentTask> runningTasks = new HashMap<>();
|
||||
private final Map<PersistentTaskId, AllocatedPersistentTask> runningTasks = new HashMap<>();
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry;
|
||||
private final TaskManager taskManager;
|
||||
|
@ -90,14 +87,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
|
||||
if (localNodeId.equals(taskInProgress.getExecutorNode())) {
|
||||
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
|
||||
RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId);
|
||||
AllocatedPersistentTask persistentTask = runningTasks.get(persistentTaskId);
|
||||
if (persistentTask == null) {
|
||||
// New task - let's start it
|
||||
startTask(taskInProgress);
|
||||
} else {
|
||||
// The task is still running
|
||||
notVisitedTasks.remove(persistentTaskId);
|
||||
if (persistentTask.getState() == State.FAILED_NOTIFICATION) {
|
||||
if (persistentTask.getState() == AllocatedPersistentTask.State.FAILED_NOTIFICATION) {
|
||||
// We tried to notify the master about this task before but the notification failed and
|
||||
// the master doesn't seem to know about it - retry notification
|
||||
restartCompletionNotification(persistentTask);
|
||||
|
@ -108,14 +105,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
for (PersistentTaskId id : notVisitedTasks) {
|
||||
RunningPersistentTask task = runningTasks.get(id);
|
||||
if (task.getState() == State.NOTIFIED || task.getState() == State.FAILED) {
|
||||
AllocatedPersistentTask task = runningTasks.get(id);
|
||||
if (task.getState() == AllocatedPersistentTask.State.NOTIFIED || task.getState() == AllocatedPersistentTask.State.FAILED) {
|
||||
// Result was sent to the caller and the caller acknowledged acceptance of the result
|
||||
finishTask(id);
|
||||
} else if (task.getState() == State.FAILED_NOTIFICATION) {
|
||||
} else if (task.getState() == AllocatedPersistentTask.State.FAILED_NOTIFICATION) {
|
||||
// We tried to send result to master, but it failed and master doesn't know about this task
|
||||
// this shouldn't really happen, unless this node is severally out of sync with the master
|
||||
logger.warn("failed to notify master about task {}", task.getId());
|
||||
logger.warn("failed to notify master about task {}", task.getPersistentTaskId());
|
||||
finishTask(id);
|
||||
} else {
|
||||
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
|
||||
|
@ -130,16 +127,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
private <Request extends PersistentTaskRequest> void startTask(PersistentTask<Request> taskInProgress) {
|
||||
PersistentTasksExecutor<Request> action = persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName());
|
||||
NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
|
||||
AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
|
||||
taskInProgress.getRequest());
|
||||
boolean processed = false;
|
||||
try {
|
||||
RunningPersistentTask runningPersistentTask = new RunningPersistentTask(task, taskInProgress.getId());
|
||||
task.setStatusProvider(runningPersistentTask);
|
||||
task.setPersistentTaskId(taskInProgress.getId());
|
||||
PersistentTaskListener listener = new PersistentTaskListener(runningPersistentTask);
|
||||
task.init(persistentTasksService, taskInProgress.getId());
|
||||
PersistentTaskListener listener = new PersistentTaskListener(task);
|
||||
try {
|
||||
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), runningPersistentTask);
|
||||
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task);
|
||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action, listener);
|
||||
} catch (Exception e) {
|
||||
// Submit task failure
|
||||
|
@ -155,17 +150,17 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
private void finishTask(PersistentTaskId persistentTaskId) {
|
||||
RunningPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null && task.getTask() != null) {
|
||||
taskManager.unregister(task.getTask());
|
||||
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null) {
|
||||
taskManager.unregister(task);
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelTask(PersistentTaskId persistentTaskId) {
|
||||
RunningPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null && task.getTask() != null) {
|
||||
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
|
||||
if (task != null) {
|
||||
if (task.markAsCancelled()) {
|
||||
persistentTasksService.sendCancellation(task.getTask().getId(), new PersistentTaskOperationListener() {
|
||||
persistentTasksService.sendCancellation(task.getId(), new PersistentTaskOperationListener() {
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
logger.trace("Persistent task with id {} was cancelled", taskId);
|
||||
|
@ -175,7 +170,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// There is really nothing we can do in case of failure here
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getId()), e);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -183,10 +178,10 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
|
||||
private void restartCompletionNotification(RunningPersistentTask task) {
|
||||
logger.trace("resending notification for task {}", task.getId());
|
||||
if (task.getState() == State.CANCELLED) {
|
||||
taskManager.unregister(task.getTask());
|
||||
private void restartCompletionNotification(AllocatedPersistentTask task) {
|
||||
logger.trace("resending notification for task {}", task.getPersistentTaskId());
|
||||
if (task.getState() == AllocatedPersistentTask.State.CANCELLED) {
|
||||
taskManager.unregister(task);
|
||||
} else {
|
||||
if (task.restartCompletionNotification()) {
|
||||
// Need to fork otherwise: java.lang.AssertionError: should not be called by a cluster state applier.
|
||||
|
@ -201,35 +196,35 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
persistentTasksService.sendCompletionNotification(task.getId(), task.getFailure(), listener);
|
||||
persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), task.getFailure(), listener);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
} else {
|
||||
logger.warn("attempt to resend notification for task {} in the {} state", task.getId(), task.getState());
|
||||
logger.warn("attempt to resend notification for task {} in the {} state", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void startCompletionNotification(RunningPersistentTask task, Exception e) {
|
||||
if (task.getState() == State.CANCELLED) {
|
||||
taskManager.unregister(task.getTask());
|
||||
private void startCompletionNotification(AllocatedPersistentTask task, Exception e) {
|
||||
if (task.getState() == AllocatedPersistentTask.State.CANCELLED) {
|
||||
taskManager.unregister(task);
|
||||
} else {
|
||||
logger.trace("sending notification for failed task {}", task.getId());
|
||||
logger.trace("sending notification for failed task {}", task.getPersistentTaskId());
|
||||
if (task.startNotification(e)) {
|
||||
persistentTasksService.sendCompletionNotification(task.getId(), e, new PublishedResponseListener(task));
|
||||
persistentTasksService.sendCompletionNotification(task.getPersistentTaskId(), e, new PublishedResponseListener(task));
|
||||
} else {
|
||||
logger.warn("attempt to send notification for task {} in the {} state", task.getId(), task.getState());
|
||||
logger.warn("attempt to send notification for task {} in the {} state", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class PersistentTaskListener implements ActionListener<Empty> {
|
||||
private final RunningPersistentTask task;
|
||||
private final AllocatedPersistentTask task;
|
||||
|
||||
PersistentTaskListener(final RunningPersistentTask task) {
|
||||
PersistentTaskListener(final AllocatedPersistentTask task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
|
@ -240,14 +235,14 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (task.getTask().isCancelled()) {
|
||||
if (task.isCancelled()) {
|
||||
// The task was explicitly cancelled - no need to restart it, just log the exception if it's not TaskCancelledException
|
||||
if (e instanceof TaskCancelledException == false) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
|
||||
"cancelled task {} failed with an exception, cancellation reason [{}]",
|
||||
task.getId(), task.getTask().getReasonCancelled()), e);
|
||||
task.getPersistentTaskId(), task.getReasonCancelled()), e);
|
||||
}
|
||||
if (CancelTasksRequest.DEFAULT_REASON.equals(task.getTask().getReasonCancelled())) {
|
||||
if (CancelTasksRequest.DEFAULT_REASON.equals(task.getReasonCancelled())) {
|
||||
startCompletionNotification(task, null);
|
||||
} else {
|
||||
startCompletionNotification(task, e);
|
||||
|
@ -259,39 +254,31 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
|
||||
private class PublishedResponseListener implements PersistentTaskOperationListener {
|
||||
private final RunningPersistentTask task;
|
||||
private final AllocatedPersistentTask task;
|
||||
|
||||
PublishedResponseListener(final RunningPersistentTask task) {
|
||||
PublishedResponseListener(final AllocatedPersistentTask task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onResponse(long taskId) {
|
||||
logger.trace("notification for task {} was successful", task.getId());
|
||||
logger.trace("notification for task {} was successful", task.getPersistentTaskId());
|
||||
if (task.markAsNotified() == false) {
|
||||
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getId(), task.getState());
|
||||
logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
taskManager.unregister(task.getTask());
|
||||
taskManager.unregister(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("notification for task {} failed - retrying", task.getId()), e);
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("notification for task {} failed - retrying", task.getPersistentTaskId()), e);
|
||||
if (task.notificationFailed() == false) {
|
||||
logger.warn("attempt to mark restart notification for task {} in the {} state failed", task.getId(), task.getState());
|
||||
logger.warn("attempt to mark restart notification for task {} in the {} state failed", task.getPersistentTaskId(), task.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public enum State {
|
||||
STARTED, // the task is currently running
|
||||
CANCELLED, // the task is cancelled
|
||||
FAILED, // the task is done running and trying to notify caller
|
||||
FAILED_NOTIFICATION, // the caller notification failed
|
||||
NOTIFIED // the caller was notified, the task can be removed
|
||||
}
|
||||
|
||||
private static class PersistentTaskId {
|
||||
private final long id;
|
||||
private final long allocationId;
|
||||
|
@ -317,80 +304,17 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
}
|
||||
|
||||
private static class RunningPersistentTask implements Provider<Task.Status> {
|
||||
private final NodePersistentTask task;
|
||||
private final long id;
|
||||
private final AtomicReference<State> state;
|
||||
@Nullable
|
||||
private Exception failure;
|
||||
|
||||
RunningPersistentTask(NodePersistentTask task, long id) {
|
||||
this(task, id, State.STARTED);
|
||||
}
|
||||
|
||||
RunningPersistentTask(NodePersistentTask task, long id, State state) {
|
||||
this.task = task;
|
||||
this.id = id;
|
||||
this.state = new AtomicReference<>(state);
|
||||
}
|
||||
|
||||
public NodePersistentTask getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
return state.get();
|
||||
}
|
||||
|
||||
public Exception getFailure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
public boolean startNotification(Exception failure) {
|
||||
boolean result = state.compareAndSet(State.STARTED, State.FAILED);
|
||||
if (result) {
|
||||
this.failure = failure;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean notificationFailed() {
|
||||
return state.compareAndSet(State.FAILED, State.FAILED_NOTIFICATION);
|
||||
}
|
||||
|
||||
public boolean restartCompletionNotification() {
|
||||
return state.compareAndSet(State.FAILED_NOTIFICATION, State.FAILED);
|
||||
}
|
||||
|
||||
public boolean markAsNotified() {
|
||||
return state.compareAndSet(State.FAILED, State.NOTIFIED);
|
||||
}
|
||||
|
||||
public boolean markAsCancelled() {
|
||||
return state.compareAndSet(State.STARTED, State.CANCELLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task.Status get() {
|
||||
return new Status(state.get());
|
||||
}
|
||||
}
|
||||
|
||||
public static class Status implements Task.Status {
|
||||
public static final String NAME = "persistent_executor";
|
||||
|
||||
private final State state;
|
||||
private final AllocatedPersistentTask.State state;
|
||||
|
||||
public Status(State state) {
|
||||
public Status(AllocatedPersistentTask.State state) {
|
||||
this.state = requireNonNull(state, "State cannot be null");
|
||||
}
|
||||
|
||||
public Status(StreamInput in) throws IOException {
|
||||
state = State.valueOf(in.readString());
|
||||
state = AllocatedPersistentTask.State.valueOf(in.readString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -416,7 +340,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
public AllocatedPersistentTask.State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.elasticsearch.persistent;
|
|||
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.persistent.PersistentTasksNodeService.State;
|
||||
import org.elasticsearch.persistent.PersistentTasksNodeService.Status;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -29,7 +28,7 @@ public class PersistentTasksNodeServiceStatusTests extends AbstractWireSerializi
|
|||
|
||||
@Override
|
||||
protected Status createTestInstance() {
|
||||
return new Status(randomFrom(State.values()));
|
||||
return new Status(randomFrom(AllocatedPersistentTask.State.values()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -338,11 +338,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
|
||||
private class Execution {
|
||||
private final PersistentTaskRequest request;
|
||||
private final NodePersistentTask task;
|
||||
private final AllocatedPersistentTask task;
|
||||
private final PersistentTasksExecutor<?> holder;
|
||||
private final ActionListener<Empty> listener;
|
||||
|
||||
Execution(PersistentTaskRequest request, NodePersistentTask task, PersistentTasksExecutor<?> holder,
|
||||
Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder,
|
||||
ActionListener<Empty> listener) {
|
||||
this.request = request;
|
||||
this.task = task;
|
||||
|
@ -359,7 +359,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <Request extends PersistentTaskRequest> void executeTask(Request request, NodePersistentTask task,
|
||||
public <Request extends PersistentTaskRequest> void executeTask(Request request, AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Request> action,
|
||||
ActionListener<Empty> listener) {
|
||||
executions.add(new Execution(request, task, action, listener));
|
||||
|
|
|
@ -340,7 +340,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(NodePersistentTask task, TestRequest request, ActionListener<Empty> listener) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, TestRequest request, ActionListener<Empty> listener) {
|
||||
logger.info("started node operation for the task {}", task);
|
||||
try {
|
||||
TestTask testTask = (TestTask) task;
|
||||
|
@ -423,7 +423,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
|
|||
}
|
||||
|
||||
|
||||
public static class TestTask extends NodePersistentTask {
|
||||
public static class TestTask extends AllocatedPersistentTask {
|
||||
private volatile String operation;
|
||||
|
||||
public TestTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
|
|
Loading…
Reference in New Issue