Simplify names of PersistentTasks-related classes
PersistentTask -> NodePersistentTask PersistentTasksInProgress -> PersistentTasks PersistentTaskInProgress -> PersistentTask
This commit is contained in:
parent
b33fc05492
commit
810d9335c0
|
@ -23,14 +23,14 @@ import org.elasticsearch.tasks.CancellableTask;
|
|||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
/**
|
||||
* Task that returns additional state information
|
||||
* Represents a executor node operation that corresponds to a persistent task
|
||||
*/
|
||||
public class PersistentTask extends CancellableTask {
|
||||
public class NodePersistentTask extends CancellableTask {
|
||||
private Provider<Status> statusProvider;
|
||||
|
||||
private long persistentTaskId;
|
||||
|
||||
public PersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
public NodePersistentTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
super(id, type, action, description, parentTask);
|
||||
}
|
||||
|
|
@ -37,7 +37,7 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.tasks.TaskCancelledException;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
import org.elasticsearch.persistent.PersistentTasks.PersistentTask;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -75,15 +75,15 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
|
|||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasksInProgress previousTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasks = event.state().getMetaData().custom(PersistentTasks.TYPE);
|
||||
PersistentTasks previousTasks = event.previousState().getMetaData().custom(PersistentTasks.TYPE);
|
||||
|
||||
if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {
|
||||
// We have some changes let's check if they are related to our node
|
||||
String localNodeId = event.state().getNodes().getLocalNodeId();
|
||||
Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet());
|
||||
if (tasks != null) {
|
||||
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
|
||||
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
|
||||
if (localNodeId.equals(taskInProgress.getExecutorNode())) {
|
||||
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
|
||||
RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId);
|
||||
|
@ -124,10 +124,10 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
|
|||
|
||||
}
|
||||
|
||||
private <Request extends PersistentActionRequest> void startTask(PersistentTaskInProgress<Request> taskInProgress) {
|
||||
private <Request extends PersistentActionRequest> void startTask(PersistentTask<Request> taskInProgress) {
|
||||
PersistentActionRegistry.PersistentActionHolder<Request> holder =
|
||||
persistentActionRegistry.getPersistentActionHolderSafe(taskInProgress.getAction());
|
||||
PersistentTask task = (PersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]",
|
||||
NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]",
|
||||
taskInProgress.getRequest());
|
||||
boolean processed = false;
|
||||
try {
|
||||
|
@ -296,23 +296,23 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl
|
|||
}
|
||||
|
||||
private static class RunningPersistentTask implements Provider<Task.Status> {
|
||||
private final PersistentTask task;
|
||||
private final NodePersistentTask task;
|
||||
private final long id;
|
||||
private final AtomicReference<State> state;
|
||||
@Nullable
|
||||
private Exception failure;
|
||||
|
||||
RunningPersistentTask(PersistentTask task, long id) {
|
||||
RunningPersistentTask(NodePersistentTask task, long id) {
|
||||
this(task, id, State.STARTED);
|
||||
}
|
||||
|
||||
RunningPersistentTask(PersistentTask task, long id, State state) {
|
||||
RunningPersistentTask(NodePersistentTask task, long id, State state) {
|
||||
this.task = task;
|
||||
this.id = id;
|
||||
this.state = new AtomicReference<>(state);
|
||||
}
|
||||
|
||||
public PersistentTask getTask() {
|
||||
public NodePersistentTask getTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ public class PersistentActionExecutor {
|
|||
}
|
||||
|
||||
public <Request extends PersistentActionRequest> void executeAction(Request request,
|
||||
PersistentTask task,
|
||||
NodePersistentTask task,
|
||||
PersistentActionRegistry.PersistentActionHolder<Request> holder,
|
||||
ActionListener<Empty> listener) {
|
||||
threadPool.executor(holder.getExecutor()).execute(new AbstractRunnable() {
|
||||
|
|
|
@ -30,6 +30,6 @@ import org.elasticsearch.tasks.TaskId;
|
|||
public abstract class PersistentActionRequest extends ActionRequest implements NamedWriteable, ToXContent {
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
|
||||
return new PersistentTask(id, type, action, getDescription(), parentTaskId);
|
||||
return new NodePersistentTask(id, type, action, getDescription(), parentTaskId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,15 +27,14 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
import org.elasticsearch.persistent.PersistentTasks.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasks.PersistentTask;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -70,7 +69,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
final Assignment assignment;
|
||||
if (stopped) {
|
||||
assignment = PersistentTasksInProgress.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere
|
||||
assignment = PersistentTasks.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere
|
||||
} else {
|
||||
assignment = getAssignement(action, currentState, request);
|
||||
}
|
||||
|
@ -85,7 +84,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(
|
||||
((PersistentTasksInProgress) newState.getMetaData().custom(PersistentTasksInProgress.TYPE)).getCurrentId());
|
||||
((PersistentTasks) newState.getMetaData().custom(PersistentTasks.TYPE)).getCurrentId());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -109,7 +108,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
|
||||
PersistentTasks.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
if (failure != null) {
|
||||
// If the task failed - we need to restart it on another node, otherwise we just remove it
|
||||
|
@ -147,7 +146,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
|
||||
PersistentTasks.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
return update(currentState, tasksInProgress
|
||||
.assignTask(id, (action, request) -> getAssignement(action, currentState, request)));
|
||||
|
@ -178,7 +177,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
|
||||
PersistentTasks.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
return update(currentState, tasksInProgress.removeTask(id));
|
||||
} else {
|
||||
|
@ -209,7 +208,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
PersistentTasksInProgress.Builder tasksInProgress = builder(currentState);
|
||||
PersistentTasks.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
return update(currentState, tasksInProgress.updateTaskStatus(id, status));
|
||||
} else {
|
||||
|
@ -253,15 +252,15 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
}
|
||||
|
||||
static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
|
||||
PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasksInProgress prevTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasks = event.state().getMetaData().custom(PersistentTasks.TYPE);
|
||||
PersistentTasks prevTasks = event.previousState().getMetaData().custom(PersistentTasks.TYPE);
|
||||
if (tasks != null && (Objects.equals(tasks, prevTasks) == false ||
|
||||
event.nodesChanged() ||
|
||||
event.routingTableChanged() ||
|
||||
event.previousState().nodes().isLocalNodeElectedMaster() == false)) {
|
||||
// We need to check if removed nodes were running any of the tasks and reassign them
|
||||
boolean reassignmentRequired = false;
|
||||
for (PersistentTaskInProgress<?> taskInProgress : tasks.tasks()) {
|
||||
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
|
||||
if (taskInProgress.needsReassignment(event.state().nodes())) {
|
||||
// there is an unassigned task or task with a disappeared node - we need to try assigning it
|
||||
if (Objects.equals(taskInProgress.getAssignment(),
|
||||
|
@ -301,13 +300,13 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
}
|
||||
|
||||
static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) {
|
||||
PersistentTasksInProgress tasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasks = currentState.getMetaData().custom(PersistentTasks.TYPE);
|
||||
ClusterState clusterState = currentState;
|
||||
DiscoveryNodes nodes = currentState.nodes();
|
||||
if (tasks != null) {
|
||||
logger.trace("reassigning {} persistent tasks", tasks.tasks().size());
|
||||
// We need to check if removed nodes were running any of the tasks and reassign them
|
||||
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
|
||||
for (PersistentTask<?> task : tasks.tasks()) {
|
||||
if (task.needsReassignment(nodes)) {
|
||||
// there is an unassigned task - we need to try assigning it
|
||||
Assignment assignment = decider.getAssignment(task.getAction(), clusterState, task.getRequest());
|
||||
|
@ -330,14 +329,14 @@ public class PersistentTaskClusterService extends AbstractComponent implements C
|
|||
return clusterState;
|
||||
}
|
||||
|
||||
private static PersistentTasksInProgress.Builder builder(ClusterState currentState) {
|
||||
return PersistentTasksInProgress.builder(currentState.getMetaData().custom(PersistentTasksInProgress.TYPE));
|
||||
private static PersistentTasks.Builder builder(ClusterState currentState) {
|
||||
return PersistentTasks.builder(currentState.getMetaData().custom(PersistentTasks.TYPE));
|
||||
}
|
||||
|
||||
private static ClusterState update(ClusterState currentState, PersistentTasksInProgress.Builder tasksInProgress) {
|
||||
private static ClusterState update(ClusterState currentState, PersistentTasks.Builder tasksInProgress) {
|
||||
if (tasksInProgress.isChanged()) {
|
||||
return ClusterState.builder(currentState).metaData(
|
||||
MetaData.builder(currentState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasksInProgress.build())
|
||||
MetaData.builder(currentState.metaData()).putCustom(PersistentTasks.TYPE, tasksInProgress.build())
|
||||
).build();
|
||||
} else {
|
||||
return currentState;
|
||||
|
|
|
@ -56,25 +56,25 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
|
|||
/**
|
||||
* A cluster state record that contains a list of all running persistent tasks
|
||||
*/
|
||||
public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
|
||||
public final class PersistentTasks extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
|
||||
public static final String TYPE = "persistent_tasks";
|
||||
|
||||
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
|
||||
|
||||
// TODO: Implement custom Diff for tasks
|
||||
private final Map<Long, PersistentTaskInProgress<?>> tasks;
|
||||
private final Map<Long, PersistentTask<?>> tasks;
|
||||
|
||||
private final long currentId;
|
||||
|
||||
public PersistentTasksInProgress(long currentId, Map<Long, PersistentTaskInProgress<?>> tasks) {
|
||||
public PersistentTasks(long currentId, Map<Long, PersistentTask<?>> tasks) {
|
||||
this.currentId = currentId;
|
||||
this.tasks = tasks;
|
||||
}
|
||||
|
||||
private static final ObjectParser<Builder, Void> PERSISTENT_TASKS_IN_PROGRESS_PARSER = new ObjectParser<>(TYPE, Builder::new);
|
||||
private static final ObjectParser<Builder, Void> PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new);
|
||||
|
||||
private static final ObjectParser<TaskBuilder<PersistentActionRequest>, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER =
|
||||
new ObjectParser<>("running_tasks", TaskBuilder::new);
|
||||
private static final ObjectParser<TaskBuilder<PersistentActionRequest>, Void> PERSISTENT_TASK_PARSER =
|
||||
new ObjectParser<>("tasks", TaskBuilder::new);
|
||||
|
||||
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
|
||||
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));
|
||||
|
@ -86,22 +86,20 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
|
||||
static {
|
||||
// Tasks parser initialization
|
||||
PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id"));
|
||||
PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_IN_PROGRESS_PARSER,
|
||||
new ParseField("running_tasks"));
|
||||
|
||||
PERSISTENT_TASKS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id"));
|
||||
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
|
||||
|
||||
// Assignment parser
|
||||
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("executor_node"));
|
||||
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
|
||||
|
||||
// Task parser initialization
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareString(TaskBuilder::setAction, new ParseField("action"));
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareNamedObjects(
|
||||
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
|
||||
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setAction, new ParseField("action"));
|
||||
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
|
||||
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
|
||||
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
|
||||
PERSISTENT_TASK_PARSER.declareNamedObjects(
|
||||
(TaskBuilder<PersistentActionRequest> taskBuilder, List<PersistentActionRequest> objects) -> {
|
||||
if (objects.size() != 1) {
|
||||
throw new IllegalArgumentException("only one action request per task is allowed");
|
||||
|
@ -109,7 +107,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
taskBuilder.setRequest(objects.get(0));
|
||||
}, REQUEST_PARSER, new ParseField("request"));
|
||||
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareNamedObjects(
|
||||
PERSISTENT_TASK_PARSER.declareNamedObjects(
|
||||
(TaskBuilder<PersistentActionRequest> taskBuilder, List<Status> objects) -> {
|
||||
if (objects.size() != 1) {
|
||||
throw new IllegalArgumentException("only one status per task is allowed");
|
||||
|
@ -118,31 +116,31 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
}, STATUS_PARSER, new ParseField("status"));
|
||||
|
||||
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
|
||||
PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
|
||||
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
|
||||
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
|
||||
new ParseField("allocation_id_on_last_status_update"));
|
||||
}
|
||||
|
||||
public Collection<PersistentTaskInProgress<?>> tasks() {
|
||||
public Collection<PersistentTask<?>> tasks() {
|
||||
return this.tasks.values();
|
||||
}
|
||||
|
||||
public Map<Long, PersistentTaskInProgress<?>> taskMap() {
|
||||
public Map<Long, PersistentTask<?>> taskMap() {
|
||||
return this.tasks;
|
||||
}
|
||||
|
||||
public PersistentTaskInProgress<?> getTask(long id) {
|
||||
public PersistentTask<?> getTask(long id) {
|
||||
return this.tasks.get(id);
|
||||
}
|
||||
|
||||
public Collection<PersistentTaskInProgress<?>> findTasks(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
|
||||
public Collection<PersistentTask<?>> findTasks(String actionName, Predicate<PersistentTask<?>> predicate) {
|
||||
return this.tasks().stream()
|
||||
.filter(p -> actionName.equals(p.getAction()))
|
||||
.filter(predicate)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public boolean tasksExist(String actionName, Predicate<PersistentTaskInProgress<?>> predicate) {
|
||||
public boolean tasksExist(String actionName, Predicate<PersistentTask<?>> predicate) {
|
||||
return this.tasks().stream()
|
||||
.filter(p -> actionName.equals(p.getAction()))
|
||||
.anyMatch(predicate);
|
||||
|
@ -152,7 +150,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
PersistentTasksInProgress that = (PersistentTasksInProgress) o;
|
||||
PersistentTasks that = (PersistentTasks) o;
|
||||
return currentId == that.currentId &&
|
||||
Objects.equals(tasks, that.tasks);
|
||||
}
|
||||
|
@ -181,8 +179,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
return ALL_CONTEXTS;
|
||||
}
|
||||
|
||||
public static PersistentTasksInProgress fromXContent(XContentParser parser) throws IOException {
|
||||
return PERSISTENT_TASKS_IN_PROGRESS_PARSER.parse(parser, null).build();
|
||||
public static PersistentTasks fromXContent(XContentParser parser) throws IOException {
|
||||
return PERSISTENT_TASKS_PARSER.parse(parser, null).build();
|
||||
}
|
||||
|
||||
public static class Assignment {
|
||||
|
@ -236,7 +234,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
/**
|
||||
* A record that represents a single running persistent task
|
||||
*/
|
||||
public static class PersistentTaskInProgress<Request extends PersistentActionRequest> implements Writeable, ToXContent {
|
||||
public static class PersistentTask<Request extends PersistentActionRequest> implements Writeable, ToXContent {
|
||||
private final long id;
|
||||
private final long allocationId;
|
||||
private final String action;
|
||||
|
@ -250,24 +248,22 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
private final Long allocationIdOnLastStatusUpdate;
|
||||
|
||||
|
||||
public PersistentTaskInProgress(long id, String action, Request request, boolean stopped, boolean removeOnCompletion,
|
||||
Assignment assignment) {
|
||||
public PersistentTask(long id, String action, Request request, boolean stopped, boolean removeOnCompletion, Assignment assignment) {
|
||||
this(id, 0L, action, request, stopped, removeOnCompletion, null, assignment, null);
|
||||
}
|
||||
|
||||
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, boolean stopped, Assignment assignment) {
|
||||
public PersistentTask(PersistentTask<Request> task, boolean stopped, Assignment assignment) {
|
||||
this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status,
|
||||
assignment, task.allocationId);
|
||||
}
|
||||
|
||||
public PersistentTaskInProgress(PersistentTaskInProgress<Request> task, Status status) {
|
||||
public PersistentTask(PersistentTask<Request> task, Status status) {
|
||||
this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status,
|
||||
task.assignment, task.allocationId);
|
||||
}
|
||||
|
||||
private PersistentTaskInProgress(long id, long allocationId, String action, Request request,
|
||||
boolean stopped, boolean removeOnCompletion, Status status,
|
||||
Assignment assignment, Long allocationIdOnLastStatusUpdate) {
|
||||
private PersistentTask(long id, long allocationId, String action, Request request, boolean stopped, boolean removeOnCompletion,
|
||||
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
|
||||
this.id = id;
|
||||
this.allocationId = allocationId;
|
||||
this.action = action;
|
||||
|
@ -282,7 +278,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private PersistentTaskInProgress(StreamInput in) throws IOException {
|
||||
private PersistentTask(StreamInput in) throws IOException {
|
||||
id = in.readLong();
|
||||
allocationId = in.readLong();
|
||||
action = in.readString();
|
||||
|
@ -312,7 +308,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
PersistentTaskInProgress<?> that = (PersistentTaskInProgress<?>) o;
|
||||
PersistentTask<?> that = (PersistentTask<?>) o;
|
||||
return id == that.id &&
|
||||
allocationId == that.allocationId &&
|
||||
Objects.equals(action, that.action) &&
|
||||
|
@ -494,8 +490,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
return this;
|
||||
}
|
||||
|
||||
public PersistentTaskInProgress<Request> build() {
|
||||
return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
|
||||
public PersistentTask<Request> build() {
|
||||
return new PersistentTask<>(id, allocationId, action, request, stopped, removeOnCompletion, status,
|
||||
assignment, allocationIdOnLastStatusUpdate);
|
||||
}
|
||||
}
|
||||
|
@ -505,9 +501,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
public PersistentTasksInProgress(StreamInput in) throws IOException {
|
||||
public PersistentTasks(StreamInput in) throws IOException {
|
||||
currentId = in.readLong();
|
||||
tasks = in.readMap(StreamInput::readLong, PersistentTaskInProgress::new);
|
||||
tasks = in.readMap(StreamInput::readLong, PersistentTask::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -530,8 +526,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.field("current_id", currentId);
|
||||
builder.startArray("running_tasks");
|
||||
for (PersistentTaskInProgress<?> entry : tasks.values()) {
|
||||
builder.startArray("tasks");
|
||||
for (PersistentTask<?> entry : tasks.values()) {
|
||||
entry.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
|
@ -542,19 +538,19 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
return new Builder();
|
||||
}
|
||||
|
||||
public static Builder builder(PersistentTasksInProgress tasks) {
|
||||
public static Builder builder(PersistentTasks tasks) {
|
||||
return new Builder(tasks);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private final Map<Long, PersistentTaskInProgress<?>> tasks = new HashMap<>();
|
||||
private final Map<Long, PersistentTask<?>> tasks = new HashMap<>();
|
||||
private long currentId;
|
||||
private boolean changed;
|
||||
|
||||
public Builder() {
|
||||
}
|
||||
|
||||
public Builder(PersistentTasksInProgress tasksInProgress) {
|
||||
public Builder(PersistentTasks tasksInProgress) {
|
||||
if (tasksInProgress != null) {
|
||||
tasks.putAll(tasksInProgress.tasks);
|
||||
currentId = tasksInProgress.currentId;
|
||||
|
@ -570,7 +566,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
|
||||
private <Request extends PersistentActionRequest> Builder setTasks(List<TaskBuilder<Request>> tasks) {
|
||||
for (TaskBuilder builder : tasks) {
|
||||
PersistentTaskInProgress<?> task = builder.build();
|
||||
PersistentTask<?> task = builder.build();
|
||||
this.tasks.put(task.getId(), task);
|
||||
}
|
||||
return this;
|
||||
|
@ -585,7 +581,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
boolean removeOnCompletion, Assignment assignment) {
|
||||
changed = true;
|
||||
currentId++;
|
||||
tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion, assignment));
|
||||
tasks.put(currentId, new PersistentTask<>(currentId, action, request, stopped, removeOnCompletion, assignment));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -593,10 +589,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
* Reassigns the task to another node if the task exist
|
||||
*/
|
||||
public Builder reassignTask(long taskId, Assignment assignment) {
|
||||
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
|
||||
PersistentTask<?> taskInProgress = tasks.get(taskId);
|
||||
if (taskInProgress != null) {
|
||||
changed = true;
|
||||
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
|
||||
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -610,12 +606,12 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
@SuppressWarnings("unchecked")
|
||||
public <Request extends PersistentActionRequest> Builder assignTask(long taskId,
|
||||
BiFunction<String, Request, Assignment> executorNodeFunc) {
|
||||
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
|
||||
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
|
||||
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
|
||||
Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
|
||||
if (assignment.isAssigned() || taskInProgress.isStopped()) {
|
||||
changed = true;
|
||||
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
|
||||
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
|
||||
}
|
||||
}
|
||||
return this;
|
||||
|
@ -627,11 +623,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
@SuppressWarnings("unchecked")
|
||||
public <Request extends PersistentActionRequest> Builder reassignTask(long taskId,
|
||||
BiFunction<String, Request, Assignment> executorNodeFunc) {
|
||||
PersistentTaskInProgress<Request> taskInProgress = (PersistentTaskInProgress<Request>) tasks.get(taskId);
|
||||
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
|
||||
if (taskInProgress != null) {
|
||||
changed = true;
|
||||
Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request);
|
||||
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment));
|
||||
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -640,10 +636,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
* Updates the task status if the task exist
|
||||
*/
|
||||
public Builder updateTaskStatus(long taskId, Status status) {
|
||||
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
|
||||
PersistentTask<?> taskInProgress = tasks.get(taskId);
|
||||
if (taskInProgress != null) {
|
||||
changed = true;
|
||||
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, status));
|
||||
tasks.put(taskId, new PersistentTask<>(taskInProgress, status));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -664,13 +660,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
* If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped.
|
||||
*/
|
||||
public Builder finishTask(long taskId) {
|
||||
PersistentTaskInProgress<?> taskInProgress = tasks.get(taskId);
|
||||
PersistentTask<?> taskInProgress = tasks.get(taskId);
|
||||
if (taskInProgress != null) {
|
||||
changed = true;
|
||||
if (taskInProgress.removeOnCompletion) {
|
||||
tasks.remove(taskId);
|
||||
} else {
|
||||
tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
|
||||
tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
|
||||
}
|
||||
}
|
||||
return this;
|
||||
|
@ -697,8 +693,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable<MetaD
|
|||
return changed;
|
||||
}
|
||||
|
||||
public PersistentTasksInProgress build() {
|
||||
return new PersistentTasksInProgress(currentId, Collections.unmodifiableMap(tasks));
|
||||
public PersistentTasks build() {
|
||||
return new PersistentTasks(currentId, Collections.unmodifiableMap(tasks));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,7 +30,7 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasks.Assignment;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -80,14 +80,14 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
|
|||
protected DiscoveryNode selectLeastLoadedNode(ClusterState clusterState, Predicate<DiscoveryNode> selector) {
|
||||
long minLoad = Long.MAX_VALUE;
|
||||
DiscoveryNode minLoadedNode = null;
|
||||
PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks persistentTasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
|
||||
for (DiscoveryNode node : clusterState.getNodes()) {
|
||||
if (selector.test(node)) {
|
||||
if (persistentTasksInProgress == null) {
|
||||
if (persistentTasks == null) {
|
||||
// We don't have any task running yet, pick the first available node
|
||||
return node;
|
||||
}
|
||||
long numberOfTasks = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), actionName);
|
||||
long numberOfTasks = persistentTasks.getNumberOfTasksOnNode(node.getId(), actionName);
|
||||
if (minLoad > numberOfTasks) {
|
||||
minLoad = numberOfTasks;
|
||||
minLoadedNode = node;
|
||||
|
@ -117,7 +117,7 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
|
|||
* The status can be used to store the current progress of the task or provide an insight for the
|
||||
* task allocator about the state of the currently running tasks.
|
||||
*/
|
||||
protected void updatePersistentTaskStatus(PersistentTask task, Task.Status status, ActionListener<Empty> listener) {
|
||||
protected void updatePersistentTaskStatus(NodePersistentTask task, Task.Status status, ActionListener<Empty> listener) {
|
||||
persistentActionService.updateStatus(task.getPersistentTaskId(), status,
|
||||
new ActionListener<UpdatePersistentTaskStatusAction.Response>() {
|
||||
@Override
|
||||
|
@ -139,7 +139,7 @@ public abstract class TransportPersistentAction<Request extends PersistentAction
|
|||
* possibly on a different node. If listener.onResponse() is called, the task is considered to be successfully
|
||||
* completed and will be removed from the cluster state and not restarted.
|
||||
*/
|
||||
protected abstract void nodeOperation(PersistentTask task, Request request, ActionListener<Empty> listener);
|
||||
protected abstract void nodeOperation(NodePersistentTask task, Request request, ActionListener<Empty> listener);
|
||||
|
||||
public String getExecutor() {
|
||||
return executor;
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
* {@link org.elasticsearch.persistent.PersistentTaskClusterService} to update cluster state with the record about running persistent
|
||||
* task.
|
||||
* <p>
|
||||
* 2. The master node updates the {@link org.elasticsearch.persistent.PersistentTasksInProgress} in the cluster state to indicate that
|
||||
* 2. The master node updates the {@link org.elasticsearch.persistent.PersistentTasks} in the cluster state to indicate that
|
||||
* there is a new persistent action
|
||||
* running in the system.
|
||||
* <p>
|
||||
|
@ -37,11 +37,11 @@
|
|||
* the cluster state and starts execution of all new actions assigned to the node it is running on.
|
||||
* <p>
|
||||
* 4. If the action fails to start on the node, the {@link org.elasticsearch.persistent.PersistentActionCoordinator} uses the
|
||||
* {@link org.elasticsearch.persistent.PersistentTasksInProgress} to notify the
|
||||
* {@link org.elasticsearch.persistent.PersistentTasks} to notify the
|
||||
* {@link org.elasticsearch.persistent.PersistentActionService}, which reassigns the action to another node in the cluster.
|
||||
* <p>
|
||||
* 5. If action finishes successfully on the node and calls listener.onResponse(), the corresponding persistent action is removed from the
|
||||
* cluster state.
|
||||
* cluster state unless .
|
||||
* <p>
|
||||
* 6. The {@link org.elasticsearch.persistent.RemovePersistentTaskAction} action can be also used to remove the persistent action.
|
||||
*/
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.persistent.CompletionPersistentTaskAction.Response;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasks.Assignment;
|
||||
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -86,7 +86,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
|
|||
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY))
|
||||
.build();
|
||||
|
||||
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder();
|
||||
PersistentTasks.Builder tasks = PersistentTasks.builder();
|
||||
boolean added = false;
|
||||
if (nonLocalNodesCount > 0) {
|
||||
for (int i = 0; i < randomInt(5); i++) {
|
||||
|
@ -105,7 +105,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
|
|||
|
||||
}
|
||||
MetaData.Builder metaData = MetaData.builder(state.metaData());
|
||||
metaData.putCustom(PersistentTasksInProgress.TYPE, tasks.build());
|
||||
metaData.putCustom(PersistentTasks.TYPE, tasks.build());
|
||||
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
|
||||
|
||||
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
|
||||
|
@ -302,36 +302,36 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
|
|||
|
||||
private <Request extends PersistentActionRequest> ClusterState addTask(ClusterState state, String action, Request request,
|
||||
String node) {
|
||||
PersistentTasksInProgress.Builder builder =
|
||||
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
|
||||
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
|
||||
PersistentTasks.Builder builder =
|
||||
PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE));
|
||||
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE,
|
||||
builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build();
|
||||
}
|
||||
|
||||
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
|
||||
PersistentTasksInProgress.Builder builder =
|
||||
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
|
||||
PersistentTasks.Builder builder =
|
||||
PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE));
|
||||
assertTrue(builder.hasTask(taskId));
|
||||
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
|
||||
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE,
|
||||
builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build();
|
||||
}
|
||||
|
||||
private ClusterState removeTask(ClusterState state, long taskId) {
|
||||
PersistentTasksInProgress.Builder builder =
|
||||
PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE));
|
||||
PersistentTasks.Builder builder =
|
||||
PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE));
|
||||
assertTrue(builder.hasTask(taskId));
|
||||
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE,
|
||||
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE,
|
||||
builder.removeTask(taskId).build())).build();
|
||||
}
|
||||
|
||||
private class Execution {
|
||||
private final PersistentActionRequest request;
|
||||
private final PersistentTask task;
|
||||
private final NodePersistentTask task;
|
||||
private final PersistentActionRegistry.PersistentActionHolder<?> holder;
|
||||
private final ActionListener<Empty> listener;
|
||||
|
||||
Execution(PersistentActionRequest request, PersistentTask task, PersistentActionRegistry.PersistentActionHolder<?> holder,
|
||||
ActionListener<Empty> listener) {
|
||||
Execution(PersistentActionRequest request, NodePersistentTask task, PersistentActionRegistry.PersistentActionHolder<?> holder,
|
||||
ActionListener<Empty> listener) {
|
||||
this.request = request;
|
||||
this.task = task;
|
||||
this.holder = holder;
|
||||
|
@ -347,7 +347,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <Request extends PersistentActionRequest> void executeAction(Request request, PersistentTask task,
|
||||
public <Request extends PersistentActionRequest> void executeAction(Request request, NodePersistentTask task,
|
||||
PersistentActionRegistry.PersistentActionHolder<Request> holder,
|
||||
ActionListener<Empty> listener) {
|
||||
executions.add(new Execution(request, task, holder, listener));
|
||||
|
|
|
@ -17,7 +17,7 @@ package org.elasticsearch.persistent;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
import org.elasticsearch.persistent.PersistentTasks.PersistentTask;
|
||||
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
|
||||
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
|
||||
|
||||
|
@ -68,8 +68,8 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
final int numberOfRunningTasks = runningTasks;
|
||||
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasks.TYPE);
|
||||
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
|
||||
|
||||
if (numberOfRunningTasks > 0) {
|
||||
|
@ -85,11 +85,11 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
|
|||
internalCluster().fullRestart();
|
||||
ensureYellow();
|
||||
|
||||
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasks.TYPE);
|
||||
assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks));
|
||||
// Check that cluster state is correct
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
PersistentTaskInProgress<?> task = tasksInProgress.getTask(taskIds[i]);
|
||||
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
|
||||
assertNotNull(task);
|
||||
assertThat(task.isStopped(), equalTo(stopped[i]));
|
||||
}
|
||||
|
@ -102,9 +102,9 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
|
|||
});
|
||||
|
||||
// Start all other tasks
|
||||
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasks.TYPE);
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
PersistentTaskInProgress<?> task = tasksInProgress.getTask(taskIds[i]);
|
||||
PersistentTask<?> task = tasksInProgress.getTask(taskIds[i]);
|
||||
assertNotNull(task);
|
||||
logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode());
|
||||
assertThat(task.isStopped(), equalTo(stopped[i]));
|
||||
|
@ -128,8 +128,8 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase {
|
|||
|
||||
assertBusy(() -> {
|
||||
// Make sure the task is removed from the cluster state
|
||||
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksInProgress.TYPE)).tasks(), empty());
|
||||
assertThat(((PersistentTasks) internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasks.TYPE)).tasks(), empty());
|
||||
});
|
||||
|
||||
}
|
||||
|
|
|
@ -126,8 +126,8 @@ public class PersistentActionIT extends ESIntegTestCase {
|
|||
.setRemoveOnCompletion(false)
|
||||
.setStopped(stopped).get().getTaskId();
|
||||
|
||||
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasks.TYPE);
|
||||
assertThat(tasksInProgress.tasks().size(), equalTo(1));
|
||||
assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped));
|
||||
assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue());
|
||||
|
@ -164,8 +164,8 @@ public class PersistentActionIT extends ESIntegTestCase {
|
|||
|
||||
assertBusy(() -> {
|
||||
// Wait for the task to be marked as stopped
|
||||
PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasks = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasks.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(1));
|
||||
assertThat(tasks.getTask(taskId).isStopped(), equalTo(true));
|
||||
assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false));
|
||||
|
@ -216,8 +216,8 @@ public class PersistentActionIT extends ESIntegTestCase {
|
|||
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]")
|
||||
.get().getTasks().get(0);
|
||||
|
||||
PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasks.TYPE);
|
||||
assertThat(tasksInProgress.tasks().size(), equalTo(1));
|
||||
assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue());
|
||||
|
||||
|
@ -230,8 +230,8 @@ public class PersistentActionIT extends ESIntegTestCase {
|
|||
|
||||
int finalI = i;
|
||||
assertBusy(() -> {
|
||||
PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasks = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasks.TYPE);
|
||||
assertThat(tasks.tasks().size(), equalTo(1));
|
||||
assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue());
|
||||
assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}"));
|
||||
|
@ -272,8 +272,8 @@ public class PersistentActionIT extends ESIntegTestCase {
|
|||
assertThat(tasks.size(), equalTo(0));
|
||||
|
||||
// Make sure the task is removed from the cluster state
|
||||
assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksInProgress.TYPE)).tasks(), empty());
|
||||
assertThat(((PersistentTasks) internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasks.TYPE)).tasks(), empty());
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -28,8 +28,8 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
import org.elasticsearch.persistent.PersistentTasks.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasks.PersistentTask;
|
||||
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -76,14 +76,14 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
|
||||
public void testReassignTasksWithNoTasks() {
|
||||
ClusterState clusterState = initialState();
|
||||
assertThat(reassign(clusterState).metaData().custom(PersistentTasksInProgress.TYPE), nullValue());
|
||||
assertThat(reassign(clusterState).metaData().custom(PersistentTasks.TYPE), nullValue());
|
||||
}
|
||||
|
||||
public void testReassignConsidersClusterStateUpdates() {
|
||||
ClusterState clusterState = initialState();
|
||||
ClusterState.Builder builder = ClusterState.builder(clusterState);
|
||||
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(
|
||||
clusterState.metaData().custom(PersistentTasksInProgress.TYPE));
|
||||
PersistentTasks.Builder tasks = PersistentTasks.builder(
|
||||
clusterState.metaData().custom(PersistentTasks.TYPE));
|
||||
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
|
||||
addTestNodes(nodes, randomIntBetween(1, 10));
|
||||
int numberOfTasks = randomIntBetween(2, 40);
|
||||
|
@ -91,11 +91,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false);
|
||||
}
|
||||
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build());
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE, tasks.build());
|
||||
clusterState = builder.metaData(metaData).nodes(nodes).build();
|
||||
ClusterState newClusterState = reassign(clusterState);
|
||||
|
||||
PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasksInProgress = newClusterState.getMetaData().custom(PersistentTasks.TYPE);
|
||||
assertThat(tasksInProgress, notNullValue());
|
||||
|
||||
}
|
||||
|
@ -103,8 +103,8 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
public void testReassignTasks() {
|
||||
ClusterState clusterState = initialState();
|
||||
ClusterState.Builder builder = ClusterState.builder(clusterState);
|
||||
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(
|
||||
clusterState.metaData().custom(PersistentTasksInProgress.TYPE));
|
||||
PersistentTasks.Builder tasks = PersistentTasks.builder(
|
||||
clusterState.metaData().custom(PersistentTasks.TYPE));
|
||||
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
|
||||
addTestNodes(nodes, randomIntBetween(1, 10));
|
||||
int numberOfTasks = randomIntBetween(0, 40);
|
||||
|
@ -128,11 +128,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
|
||||
}
|
||||
}
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build());
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE, tasks.build());
|
||||
clusterState = builder.metaData(metaData).nodes(nodes).build();
|
||||
ClusterState newClusterState = reassign(clusterState);
|
||||
|
||||
PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasksInProgress = newClusterState.getMetaData().custom(PersistentTasks.TYPE);
|
||||
assertThat(tasksInProgress, notNullValue());
|
||||
|
||||
assertThat("number of tasks shouldn't change as a result or reassignment",
|
||||
|
@ -140,7 +140,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
|
||||
int assignOneCount = 0;
|
||||
|
||||
for (PersistentTaskInProgress<?> task : tasksInProgress.tasks()) {
|
||||
for (PersistentTask<?> task : tasksInProgress.tasks()) {
|
||||
if (task.isStopped()) {
|
||||
assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue());
|
||||
assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getAction()));
|
||||
|
@ -151,7 +151,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
assertThat(task.getExecutorNode(), notNullValue());
|
||||
assertThat(task.isAssigned(), equalTo(true));
|
||||
if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) {
|
||||
logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString());
|
||||
logger.info(clusterState.metaData().custom(PersistentTasks.TYPE).toString());
|
||||
}
|
||||
assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(),
|
||||
clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true));
|
||||
|
@ -213,7 +213,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
|
||||
private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) {
|
||||
DiscoveryNodes nodes = clusterState.nodes();
|
||||
PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasksInProgress = clusterState.getMetaData().custom(PersistentTasks.TYPE);
|
||||
if (tasksInProgress.findTasks("assign_one",
|
||||
task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) {
|
||||
return randomNodeAssignment(clusterState.nodes());
|
||||
|
@ -242,15 +242,15 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
return "nodes_changed: " + event.nodesChanged() +
|
||||
" nodes_removed:" + event.nodesRemoved() +
|
||||
" routing_table_changed:" + event.routingTableChanged() +
|
||||
" tasks: " + event.state().metaData().custom(PersistentTasksInProgress.TYPE);
|
||||
" tasks: " + event.state().metaData().custom(PersistentTasks.TYPE);
|
||||
}
|
||||
|
||||
private ClusterState significantChange(ClusterState clusterState) {
|
||||
ClusterState.Builder builder = ClusterState.builder(clusterState);
|
||||
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
|
||||
if (tasks != null) {
|
||||
if (randomBoolean()) {
|
||||
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
|
||||
for (PersistentTask<?> task : tasks.tasks()) {
|
||||
if (task.isAssigned() && clusterState.nodes().nodeExists(task.getExecutorNode())) {
|
||||
logger.info("removed node {}", task.getExecutorNode());
|
||||
builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode()));
|
||||
|
@ -265,11 +265,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
// we don't have any unassigned tasks - add some
|
||||
if (randomBoolean()) {
|
||||
logger.info("added random task");
|
||||
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false);
|
||||
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasks.builder(tasks), null, false);
|
||||
tasksOrNodesChanged = true;
|
||||
} else {
|
||||
logger.info("added unassignable task with custom assignment message");
|
||||
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks),
|
||||
addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasks.builder(tasks),
|
||||
new Assignment(null, "change me"), "never_assign", false);
|
||||
tasksOrNodesChanged = true;
|
||||
}
|
||||
|
@ -292,11 +292,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
private PersistentTasksInProgress removeTasksWithChangingAssignment(PersistentTasksInProgress tasks) {
|
||||
private PersistentTasks removeTasksWithChangingAssignment(PersistentTasks tasks) {
|
||||
if (tasks != null) {
|
||||
boolean changed = false;
|
||||
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
|
||||
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
|
||||
PersistentTasks.Builder tasksBuilder = PersistentTasks.builder(tasks);
|
||||
for (PersistentTask<?> task : tasks.tasks()) {
|
||||
// Remove all unassigned tasks that cause changing assignments they might trigger a significant change
|
||||
if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) &&
|
||||
"change me".equals(task.getAssignment().getExplanation())) {
|
||||
|
@ -314,9 +314,9 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
|
||||
private ClusterState insignificantChange(ClusterState clusterState) {
|
||||
ClusterState.Builder builder = ClusterState.builder(clusterState);
|
||||
PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE);
|
||||
PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE);
|
||||
tasks = removeTasksWithChangingAssignment(tasks);
|
||||
PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks);
|
||||
PersistentTasks.Builder tasksBuilder = PersistentTasks.builder(tasks);
|
||||
|
||||
if (randomBoolean()) {
|
||||
if (hasAssignableTasks(tasks, clusterState.nodes()) == false) {
|
||||
|
@ -338,7 +338,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
} else {
|
||||
logger.info("changed routing table");
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData());
|
||||
metaData.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build());
|
||||
metaData.putCustom(PersistentTasks.TYPE, tasksBuilder.build());
|
||||
RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable());
|
||||
changeRoutingTable(metaData, routingTable);
|
||||
builder.metaData(metaData).routingTable(routingTable.build());
|
||||
|
@ -360,18 +360,18 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
// clear the task
|
||||
if (randomBoolean()) {
|
||||
logger.info("removed all tasks");
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE,
|
||||
PersistentTasksInProgress.builder().build());
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE,
|
||||
PersistentTasks.builder().build());
|
||||
return builder.metaData(metaData).build();
|
||||
} else {
|
||||
logger.info("set task custom to null");
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasksInProgress.TYPE);
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasks.TYPE);
|
||||
return builder.metaData(metaData).build();
|
||||
}
|
||||
}
|
||||
logger.info("removed all unassigned tasks and changed routing table");
|
||||
if (tasks != null) {
|
||||
for (PersistentTaskInProgress<?> task : tasks.tasks()) {
|
||||
for (PersistentTask<?> task : tasks.tasks()) {
|
||||
if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) {
|
||||
tasksBuilder.removeTask(task.getId());
|
||||
}
|
||||
|
@ -384,11 +384,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
.numberOfReplicas(1)
|
||||
.build();
|
||||
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).put(indexMetaData, false)
|
||||
.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build());
|
||||
.putCustom(PersistentTasks.TYPE, tasksBuilder.build());
|
||||
return builder.metaData(metaData).build();
|
||||
}
|
||||
|
||||
private boolean hasAssignableTasks(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) {
|
||||
private boolean hasAssignableTasks(PersistentTasks tasks, DiscoveryNodes discoveryNodes) {
|
||||
if (tasks == null || tasks.tasks().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -403,26 +403,26 @@ public class PersistentTaskClusterServiceTests extends ESTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) {
|
||||
private boolean hasTasksAssignedTo(PersistentTasks tasks, String nodeId) {
|
||||
return tasks != null && tasks.tasks().stream().anyMatch(
|
||||
task -> nodeId.equals(task.getExecutorNode())) == false;
|
||||
}
|
||||
|
||||
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
|
||||
MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks,
|
||||
MetaData.Builder metaData, PersistentTasks.Builder tasks,
|
||||
String node, boolean stopped) {
|
||||
return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)),
|
||||
randomAsciiOfLength(10), stopped);
|
||||
}
|
||||
|
||||
private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder,
|
||||
MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks,
|
||||
MetaData.Builder metaData, PersistentTasks.Builder tasks,
|
||||
Assignment assignment, String param, boolean stopped) {
|
||||
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE,
|
||||
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasks.TYPE,
|
||||
tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build()));
|
||||
}
|
||||
|
||||
private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) {
|
||||
private void addTask(PersistentTasks.Builder tasks, String action, String param, String node, boolean stopped) {
|
||||
tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action));
|
||||
}
|
||||
|
||||
|
|
|
@ -34,9 +34,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.AbstractDiffableSerializationTestCase;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.Builder;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.PersistentTaskInProgress;
|
||||
import org.elasticsearch.persistent.PersistentTasks.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasks.Builder;
|
||||
import org.elasticsearch.persistent.PersistentTasks.PersistentTask;
|
||||
import org.elasticsearch.persistent.TestPersistentActionPlugin.Status;
|
||||
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestPersistentAction;
|
||||
import org.elasticsearch.persistent.TestPersistentActionPlugin.TestRequest;
|
||||
|
@ -50,12 +50,12 @@ import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY;
|
|||
import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT;
|
||||
import static org.elasticsearch.persistent.TransportPersistentAction.NO_NODE_FOUND;
|
||||
|
||||
public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase<Custom> {
|
||||
public class PersistentTasksTests extends AbstractDiffableSerializationTestCase<Custom> {
|
||||
|
||||
@Override
|
||||
protected PersistentTasksInProgress createTestInstance() {
|
||||
protected PersistentTasks createTestInstance() {
|
||||
int numberOfTasks = randomInt(10);
|
||||
PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder();
|
||||
PersistentTasks.Builder tasks = PersistentTasks.builder();
|
||||
for (int i = 0; i < numberOfTasks; i++) {
|
||||
boolean stopped = randomBoolean();
|
||||
tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)),
|
||||
|
@ -70,14 +70,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
|
||||
@Override
|
||||
protected Writeable.Reader<Custom> instanceReader() {
|
||||
return PersistentTasksInProgress::new;
|
||||
return PersistentTasks::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||
return new NamedWriteableRegistry(Arrays.asList(
|
||||
new Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
|
||||
new Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
|
||||
new Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new),
|
||||
new Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom),
|
||||
new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new),
|
||||
new Entry(Task.Status.class, Status.NAME, Status::new)
|
||||
));
|
||||
|
@ -85,7 +85,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
|
||||
@Override
|
||||
protected Custom makeTestChanges(Custom testInstance) {
|
||||
PersistentTasksInProgress tasksInProgress = (PersistentTasksInProgress) testInstance;
|
||||
PersistentTasks tasksInProgress = (PersistentTasks) testInstance;
|
||||
Builder builder = new Builder();
|
||||
switch (randomInt(3)) {
|
||||
case 0:
|
||||
|
@ -118,12 +118,12 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
|
||||
@Override
|
||||
protected Writeable.Reader<Diff<Custom>> diffReader() {
|
||||
return PersistentTasksInProgress::readDiffFrom;
|
||||
return PersistentTasks::readDiffFrom;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PersistentTasksInProgress doParseInstance(XContentParser parser) throws IOException {
|
||||
return PersistentTasksInProgress.fromXContent(parser);
|
||||
protected PersistentTasks doParseInstance(XContentParser parser) throws IOException {
|
||||
return PersistentTasks.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -155,7 +155,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
return builder;
|
||||
}
|
||||
|
||||
private long pickRandomTask(PersistentTasksInProgress testInstance) {
|
||||
private long pickRandomTask(PersistentTasks testInstance) {
|
||||
return randomFrom(new ArrayList<>(testInstance.tasks())).getId();
|
||||
}
|
||||
|
||||
|
@ -170,9 +170,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSerializationContext() throws Exception {
|
||||
PersistentTasksInProgress testInstance = createTestInstance();
|
||||
PersistentTasks testInstance = createTestInstance();
|
||||
for (int i = 0; i < randomInt(10); i++) {
|
||||
testInstance = (PersistentTasksInProgress) makeTestChanges(testInstance);
|
||||
testInstance = (PersistentTasks) makeTestChanges(testInstance);
|
||||
}
|
||||
|
||||
ToXContent.MapParams params = new ToXContent.MapParams(
|
||||
|
@ -183,12 +183,12 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
XContentBuilder shuffled = shuffleXContent(builder);
|
||||
|
||||
XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled.bytes());
|
||||
PersistentTasksInProgress newInstance = doParseInstance(parser);
|
||||
PersistentTasks newInstance = doParseInstance(parser);
|
||||
assertNotSame(newInstance, testInstance);
|
||||
|
||||
assertEquals(testInstance.tasks().size(), newInstance.tasks().size());
|
||||
for (PersistentTaskInProgress<?> testTask : testInstance.tasks()) {
|
||||
PersistentTaskInProgress<TestRequest> newTask = (PersistentTaskInProgress<TestRequest>) newInstance.getTask(testTask.getId());
|
||||
for (PersistentTask<?> testTask : testInstance.tasks()) {
|
||||
PersistentTask<TestRequest> newTask = (PersistentTask<TestRequest>) newInstance.getTask(testTask.getId());
|
||||
assertNotNull(newTask);
|
||||
|
||||
// Things that should be serialized
|
||||
|
@ -205,14 +205,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
}
|
||||
|
||||
public void testBuilder() {
|
||||
PersistentTasksInProgress persistentTasksInProgress = null;
|
||||
PersistentTasks persistentTasks = null;
|
||||
long lastKnownTask = -1;
|
||||
for (int i = 0; i < randomIntBetween(10, 100); i++) {
|
||||
final Builder builder;
|
||||
if (randomBoolean()) {
|
||||
builder = new Builder();
|
||||
} else {
|
||||
builder = new Builder(persistentTasksInProgress);
|
||||
builder = new Builder(persistentTasks);
|
||||
}
|
||||
boolean changed = false;
|
||||
for (int j = 0; j < randomIntBetween(1, 10); j++) {
|
||||
|
@ -233,7 +233,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
break;
|
||||
case 2:
|
||||
if (builder.hasTask(lastKnownTask)) {
|
||||
PersistentTaskInProgress<?> task = builder.build().getTask(lastKnownTask);
|
||||
PersistentTask<?> task = builder.build().getTask(lastKnownTask);
|
||||
if (randomBoolean()) {
|
||||
// Trying to reassign to the same node
|
||||
builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment());
|
||||
|
@ -276,7 +276,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio
|
|||
}
|
||||
}
|
||||
assertEquals(changed, builder.isChanged());
|
||||
persistentTasksInProgress = builder.build();
|
||||
persistentTasks = builder.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -63,7 +63,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportResponse.Empty;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.persistent.PersistentTasksInProgress.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasks.Assignment;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -121,8 +121,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
|
|||
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class,
|
||||
PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new),
|
||||
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new),
|
||||
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
|
||||
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new),
|
||||
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new)
|
||||
);
|
||||
}
|
||||
|
@ -130,8 +130,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
|
|||
@Override
|
||||
public List<NamedXContentRegistry.Entry> getNamedXContent() {
|
||||
return Arrays.asList(
|
||||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE),
|
||||
PersistentTasksInProgress::fromXContent),
|
||||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasks.TYPE),
|
||||
PersistentTasks::fromXContent),
|
||||
new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME),
|
||||
TestRequest::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
|
||||
|
@ -382,7 +382,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(PersistentTask task, TestRequest request, ActionListener<Empty> listener) {
|
||||
protected void nodeOperation(NodePersistentTask task, TestRequest request, ActionListener<Empty> listener) {
|
||||
logger.info("started node operation for the task {}", task);
|
||||
try {
|
||||
TestTask testTask = (TestTask) task;
|
||||
|
@ -465,7 +465,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin {
|
|||
}
|
||||
|
||||
|
||||
public static class TestTask extends PersistentTask {
|
||||
public static class TestTask extends NodePersistentTask {
|
||||
private volatile String operation;
|
||||
|
||||
public TestTask(long id, String type, String action, String description, TaskId parentTask) {
|
||||
|
|
Loading…
Reference in New Issue