Uncouple persistent task state and status (#31031)
This pull request removes the relationship between the state of persistent task (as stored in the cluster state) and the status of the task (as reported by the Task APIs and used in various places) that have been confusing for some time (#29608). In order to do that, a new PersistentTaskState interface is added. This interface represents the persisted state of a persistent task. The methods used to update the state of persistent tasks are renamed: updatePersistentStatus() becomes updatePersistentTaskState() and now takes a PersistentTaskState as a parameter. The Task.Status type as been changed to PersistentTaskState in all places were it make sense (in persistent task customs in cluster state and all other methods that deal with the state of an allocated persistent task).
This commit is contained in:
parent
8c6ee7db54
commit
992c7889ee
|
@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
|
||||
|
@ -77,8 +76,9 @@ public class AllocatedPersistentTask extends CancellableTask {
|
|||
* <p>
|
||||
* This doesn't affect the status of this allocated task.
|
||||
*/
|
||||
public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
|
||||
public void updatePersistentTaskState(final PersistentTaskState state,
|
||||
final ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
persistentTasksService.sendUpdateStateRequest(persistentTaskId, allocationId, state, listener);
|
||||
}
|
||||
|
||||
public String getPersistentTaskId() {
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.elasticsearch.persistent;
|
|||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
/**
|
||||
|
@ -29,16 +28,17 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
* It abstracts away the execution of tasks and greatly simplifies testing of PersistentTasksNodeService
|
||||
*/
|
||||
public class NodePersistentTasksExecutor {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
public NodePersistentTasksExecutor(ThreadPool threadPool) {
|
||||
NodePersistentTasksExecutor(ThreadPool threadPool) {
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public <Params extends PersistentTaskParams> void executeTask(Params params,
|
||||
@Nullable Task.Status status,
|
||||
AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Params> executor) {
|
||||
public <Params extends PersistentTaskParams> void executeTask(final Params params,
|
||||
final @Nullable PersistentTaskState state,
|
||||
final AllocatedPersistentTask task,
|
||||
final PersistentTasksExecutor<Params> executor) {
|
||||
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
@ -49,14 +49,12 @@ public class NodePersistentTasksExecutor {
|
|||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
try {
|
||||
executor.nodeOperation(task, params, status);
|
||||
executor.nodeOperation(task, params, state);
|
||||
} catch (Exception ex) {
|
||||
task.markAsFailed(ex);
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.persistent;
|
||||
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
|
||||
/**
|
||||
* {@link PersistentTaskState} represents the state of the persistent tasks, as it
|
||||
* is persisted in the cluster state.
|
||||
*/
|
||||
public interface PersistentTaskState extends ToXContentObject, NamedWriteable {
|
||||
}
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
|||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.persistent.decider.AssignmentDecision;
|
||||
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -178,27 +177,30 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
}
|
||||
|
||||
/**
|
||||
* Update task status
|
||||
* Update the state of a persistent task
|
||||
*
|
||||
* @param id the id of a persistent task
|
||||
* @param allocationId the expected allocation id of the persistent task
|
||||
* @param status new status
|
||||
* @param taskId the id of a persistent task
|
||||
* @param taskAllocationId the expected allocation id of the persistent task
|
||||
* @param taskState new state
|
||||
* @param listener the listener that will be called when task is removed
|
||||
*/
|
||||
public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
|
||||
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
|
||||
public void updatePersistentTaskState(final String taskId,
|
||||
final long taskAllocationId,
|
||||
final PersistentTaskState taskState,
|
||||
final ActionListener<PersistentTask<?>> listener) {
|
||||
clusterService.submitStateUpdateTask("update task state", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
|
||||
if (tasksInProgress.hasTask(id, allocationId)) {
|
||||
return update(currentState, tasksInProgress.updateTaskStatus(id, status));
|
||||
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
|
||||
return update(currentState, tasksInProgress.updateTaskState(taskId, taskState));
|
||||
} else {
|
||||
if (tasksInProgress.hasTask(id)) {
|
||||
logger.warn("trying to update status on task {} with unexpected allocation id {}", id, allocationId);
|
||||
if (tasksInProgress.hasTask(taskId)) {
|
||||
logger.warn("trying to update state on task {} with unexpected allocation id {}", taskId, taskAllocationId);
|
||||
} else {
|
||||
logger.warn("trying to update status on non-existing task {}", id);
|
||||
logger.warn("trying to update state on non-existing task {}", taskId);
|
||||
}
|
||||
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", id, allocationId);
|
||||
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -209,7 +211,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, id));
|
||||
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -38,8 +38,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
|||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.Task.Status;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
@ -61,13 +59,12 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
|
|||
* A cluster state record that contains a list of all running persistent tasks
|
||||
*/
|
||||
public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
|
||||
public static final String TYPE = "persistent_tasks";
|
||||
|
||||
public static final String TYPE = "persistent_tasks";
|
||||
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
|
||||
|
||||
// TODO: Implement custom Diff for tasks
|
||||
private final Map<String, PersistentTask<?>> tasks;
|
||||
|
||||
private final long lastAllocationId;
|
||||
|
||||
public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, PersistentTask<?>> tasks) {
|
||||
|
@ -94,8 +91,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
ObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, String> parser = new ObjectParser<>("named");
|
||||
parser.declareObject(TaskDescriptionBuilder::setParams,
|
||||
(p, c) -> p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params"));
|
||||
parser.declareObject(TaskDescriptionBuilder::setStatus,
|
||||
(p, c) -> p.namedObject(Status.class, c, null), new ParseField("status"));
|
||||
parser.declareObject(TaskDescriptionBuilder::setState,
|
||||
(p, c) -> p.namedObject(PersistentTaskState.class, c, null), new ParseField("state", "status"));
|
||||
TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name);
|
||||
|
||||
// Assignment parser
|
||||
|
@ -115,7 +112,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
TaskDescriptionBuilder<PersistentTaskParams> builder = objects.get(0);
|
||||
taskBuilder.setTaskName(builder.taskName);
|
||||
taskBuilder.setParams(builder.params);
|
||||
taskBuilder.setStatus(builder.status);
|
||||
taskBuilder.setState(builder.state);
|
||||
}, TASK_DESCRIPTION_PARSER, new ParseField("task"));
|
||||
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
|
||||
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
|
||||
|
@ -123,12 +120,13 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
}
|
||||
|
||||
/**
|
||||
* Private builder used in XContent parser to build task-specific portion (params and status)
|
||||
* Private builder used in XContent parser to build task-specific portion (params and state)
|
||||
*/
|
||||
private static class TaskDescriptionBuilder<Params extends PersistentTaskParams> {
|
||||
|
||||
private final String taskName;
|
||||
private Params params;
|
||||
private Status status;
|
||||
private PersistentTaskState state;
|
||||
|
||||
private TaskDescriptionBuilder(String taskName) {
|
||||
this.taskName = taskName;
|
||||
|
@ -139,8 +137,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
return this;
|
||||
}
|
||||
|
||||
private TaskDescriptionBuilder setStatus(Status status) {
|
||||
this.status = status;
|
||||
private TaskDescriptionBuilder setState(PersistentTaskState state) {
|
||||
this.state = state;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
@ -261,37 +259,34 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
* A record that represents a single running persistent task
|
||||
*/
|
||||
public static class PersistentTask<P extends PersistentTaskParams> implements Writeable, ToXContentObject {
|
||||
|
||||
private final String id;
|
||||
private final long allocationId;
|
||||
private final String taskName;
|
||||
private final P params;
|
||||
@Nullable
|
||||
private final Status status;
|
||||
private final @Nullable PersistentTaskState state;
|
||||
private final Assignment assignment;
|
||||
@Nullable
|
||||
private final Long allocationIdOnLastStatusUpdate;
|
||||
private final @Nullable Long allocationIdOnLastStatusUpdate;
|
||||
|
||||
public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) {
|
||||
this(id, allocationId, taskName, params, null, assignment, null);
|
||||
public PersistentTask(final String id, final String name, final P params, final long allocationId, final Assignment assignment) {
|
||||
this(id, allocationId, name, params, null, assignment, null);
|
||||
}
|
||||
|
||||
public PersistentTask(PersistentTask<P> task, long allocationId, Assignment assignment) {
|
||||
this(task.id, allocationId, task.taskName, task.params, task.status,
|
||||
assignment, task.allocationId);
|
||||
public PersistentTask(final PersistentTask<P> task, final long allocationId, final Assignment assignment) {
|
||||
this(task.id, allocationId, task.taskName, task.params, task.state, assignment, task.allocationId);
|
||||
}
|
||||
|
||||
public PersistentTask(PersistentTask<P> task, Status status) {
|
||||
this(task.id, task.allocationId, task.taskName, task.params, status,
|
||||
task.assignment, task.allocationId);
|
||||
public PersistentTask(final PersistentTask<P> task, final PersistentTaskState state) {
|
||||
this(task.id, task.allocationId, task.taskName, task.params, state, task.assignment, task.allocationId);
|
||||
}
|
||||
|
||||
private PersistentTask(String id, long allocationId, String taskName, P params,
|
||||
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
|
||||
private PersistentTask(final String id, final long allocationId, final String name, final P params,
|
||||
final PersistentTaskState state, final Assignment assignment, final Long allocationIdOnLastStatusUpdate) {
|
||||
this.id = id;
|
||||
this.allocationId = allocationId;
|
||||
this.taskName = taskName;
|
||||
this.taskName = name;
|
||||
this.params = params;
|
||||
this.status = status;
|
||||
this.state = state;
|
||||
this.assignment = assignment;
|
||||
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
|
||||
if (params != null) {
|
||||
|
@ -300,10 +295,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
params.getWriteableName() + " task: " + taskName);
|
||||
}
|
||||
}
|
||||
if (status != null) {
|
||||
if (status.getWriteableName().equals(taskName) == false) {
|
||||
if (state != null) {
|
||||
if (state.getWriteableName().equals(taskName) == false) {
|
||||
throw new IllegalArgumentException("status has to have the same writeable name as task. status: " +
|
||||
status.getWriteableName() + " task: " + taskName);
|
||||
state.getWriteableName() + " task: " + taskName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -318,7 +313,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
} else {
|
||||
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
|
||||
}
|
||||
status = in.readOptionalNamedWriteable(Task.Status.class);
|
||||
state = in.readOptionalNamedWriteable(PersistentTaskState.class);
|
||||
assignment = new Assignment(in.readOptionalString(), in.readString());
|
||||
allocationIdOnLastStatusUpdate = in.readOptionalLong();
|
||||
}
|
||||
|
@ -333,7 +328,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
} else {
|
||||
out.writeOptionalNamedWriteable(params);
|
||||
}
|
||||
out.writeOptionalNamedWriteable(status);
|
||||
out.writeOptionalNamedWriteable(state);
|
||||
out.writeOptionalString(assignment.executorNode);
|
||||
out.writeString(assignment.explanation);
|
||||
out.writeOptionalLong(allocationIdOnLastStatusUpdate);
|
||||
|
@ -348,15 +343,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
allocationId == that.allocationId &&
|
||||
Objects.equals(taskName, that.taskName) &&
|
||||
Objects.equals(params, that.params) &&
|
||||
Objects.equals(status, that.status) &&
|
||||
Objects.equals(state, that.state) &&
|
||||
Objects.equals(assignment, that.assignment) &&
|
||||
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, allocationId, taskName, params, status, assignment,
|
||||
allocationIdOnLastStatusUpdate);
|
||||
return Objects.hash(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -395,8 +389,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
}
|
||||
|
||||
@Nullable
|
||||
public Status getStatus() {
|
||||
return status;
|
||||
public PersistentTaskState getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -411,8 +405,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
if (params != null) {
|
||||
builder.field("params", params, xParams);
|
||||
}
|
||||
if (status != null) {
|
||||
builder.field("status", status, xParams);
|
||||
if (state != null) {
|
||||
builder.field("state", state, xParams);
|
||||
}
|
||||
}
|
||||
builder.endObject();
|
||||
|
@ -448,7 +442,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
private long allocationId;
|
||||
private String taskName;
|
||||
private Params params;
|
||||
private Status status;
|
||||
private PersistentTaskState state;
|
||||
private Assignment assignment = INITIAL_ASSIGNMENT;
|
||||
private Long allocationIdOnLastStatusUpdate;
|
||||
|
||||
|
@ -472,8 +466,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
return this;
|
||||
}
|
||||
|
||||
public TaskBuilder<Params> setStatus(Status status) {
|
||||
this.status = status;
|
||||
public TaskBuilder<Params> setState(PersistentTaskState state) {
|
||||
this.state = state;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -489,8 +483,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
}
|
||||
|
||||
public PersistentTask<Params> build() {
|
||||
return new PersistentTask<>(id, allocationId, taskName, params, status,
|
||||
assignment, allocationIdOnLastStatusUpdate);
|
||||
return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -608,13 +601,13 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
|
|||
}
|
||||
|
||||
/**
|
||||
* Updates the task status
|
||||
* Updates the task state
|
||||
*/
|
||||
public Builder updateTaskStatus(String taskId, Status status) {
|
||||
public Builder updateTaskState(final String taskId, final PersistentTaskState taskState) {
|
||||
PersistentTask<?> taskInProgress = tasks.get(taskId);
|
||||
if (taskInProgress != null) {
|
||||
changed = true;
|
||||
tasks.put(taskId, new PersistentTask<>(taskInProgress, status));
|
||||
tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState));
|
||||
} else {
|
||||
throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists");
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
||||
import java.util.Map;
|
||||
|
@ -118,7 +117,7 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
|
|||
* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
|
||||
* indicate that the persistent task has finished.
|
||||
*/
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status);
|
||||
protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable PersistentTaskState state);
|
||||
|
||||
public String getExecutor() {
|
||||
return executor;
|
||||
|
|
|
@ -50,13 +50,13 @@ import static java.util.Objects.requireNonNull;
|
|||
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
|
||||
*/
|
||||
public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private final Map<Long, AllocatedPersistentTask> runningTasks = new HashMap<>();
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry;
|
||||
private final TaskManager taskManager;
|
||||
private final NodePersistentTasksExecutor nodePersistentTasksExecutor;
|
||||
|
||||
|
||||
public PersistentTasksNodeService(Settings settings,
|
||||
PersistentTasksService persistentTasksService,
|
||||
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry,
|
||||
|
@ -172,7 +172,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
task.getPersistentTaskId(), task.getAllocationId());
|
||||
try {
|
||||
runningTasks.put(taskInProgress.getAllocationId(), task);
|
||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor);
|
||||
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getState(), task, executor);
|
||||
} catch (Exception e) {
|
||||
// Submit task failure
|
||||
task.markAsFailed(e);
|
||||
|
@ -215,8 +215,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public static class Status implements Task.Status {
|
||||
|
||||
public static final String NAME = "persistent_executor";
|
||||
|
||||
private final AllocatedPersistentTask.State state;
|
||||
|
@ -252,10 +252,6 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
|
|||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
public AllocatedPersistentTask.State getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return false;
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -113,13 +112,14 @@ public class PersistentTasksService extends AbstractComponent {
|
|||
* Notifies the master node that the state of a persistent task has changed.
|
||||
* <p>
|
||||
* Persistent task implementers shouldn't call this method directly and use
|
||||
* {@link AllocatedPersistentTask#updatePersistentStatus} instead
|
||||
* {@link AllocatedPersistentTask#updatePersistentTaskState} instead
|
||||
*/
|
||||
void updateStatus(final String taskId,
|
||||
void sendUpdateStateRequest(final String taskId,
|
||||
final long taskAllocationID,
|
||||
final Task.Status status,
|
||||
final PersistentTaskState taskState,
|
||||
final ActionListener<PersistentTask<?>> listener) {
|
||||
UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, status);
|
||||
UpdatePersistentTaskStatusAction.Request request =
|
||||
new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, taskState);
|
||||
execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener);
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -63,16 +62,15 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
|
||||
private String taskId;
|
||||
private long allocationId = -1L;
|
||||
private Task.Status status;
|
||||
private PersistentTaskState state;
|
||||
|
||||
public Request() {
|
||||
|
||||
}
|
||||
|
||||
public Request(String taskId, long allocationId, Task.Status status) {
|
||||
public Request(String taskId, long allocationId, PersistentTaskState state) {
|
||||
this.taskId = taskId;
|
||||
this.allocationId = allocationId;
|
||||
this.status = status;
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
public void setTaskId(String taskId) {
|
||||
|
@ -83,8 +81,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
this.allocationId = allocationId;
|
||||
}
|
||||
|
||||
public void setStatus(Task.Status status) {
|
||||
this.status = status;
|
||||
public void setState(PersistentTaskState state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,7 +90,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
super.readFrom(in);
|
||||
taskId = in.readString();
|
||||
allocationId = in.readLong();
|
||||
status = in.readOptionalNamedWriteable(Task.Status.class);
|
||||
state = in.readOptionalNamedWriteable(PersistentTaskState.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,7 +98,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
super.writeTo(out);
|
||||
out.writeString(taskId);
|
||||
out.writeLong(allocationId);
|
||||
out.writeOptionalNamedWriteable(status);
|
||||
out.writeOptionalNamedWriteable(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -122,13 +120,12 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Request request = (Request) o;
|
||||
return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId &&
|
||||
Objects.equals(status, request.status);
|
||||
return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId && Objects.equals(state, request.state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(taskId, allocationId, status);
|
||||
return Objects.hash(taskId, allocationId, state);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -144,11 +141,10 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
return this;
|
||||
}
|
||||
|
||||
public final RequestBuilder setStatus(Task.Status status) {
|
||||
request.setStatus(status);
|
||||
public final RequestBuilder setState(PersistentTaskState state) {
|
||||
request.setState(state);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
|
||||
|
@ -182,9 +178,10 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
|
|||
}
|
||||
|
||||
@Override
|
||||
protected final void masterOperation(final Request request, ClusterState state,
|
||||
protected final void masterOperation(final Request request,
|
||||
final ClusterState state,
|
||||
final ActionListener<PersistentTaskResponse> listener) {
|
||||
persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status,
|
||||
persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state,
|
||||
new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> task) {
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask
|
|||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -649,7 +648,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status status) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, P params, PersistentTaskState state) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}));
|
||||
|
|
|
@ -42,10 +42,9 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Builder;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.AbstractDiffableSerializationTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -79,7 +78,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
randomAssignment());
|
||||
if (randomBoolean()) {
|
||||
// From time to time update status
|
||||
tasks.updateTaskStatus(taskId, new Status(randomAlphaOfLength(10)));
|
||||
tasks.updateTaskState(taskId, new State(randomAlphaOfLength(10)));
|
||||
}
|
||||
}
|
||||
return tasks.build();
|
||||
|
@ -96,7 +95,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new),
|
||||
new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom),
|
||||
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
|
||||
new Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
|
||||
new Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new)
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -118,7 +117,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
if (builder.getCurrentTaskIds().isEmpty()) {
|
||||
addRandomTask(builder);
|
||||
} else {
|
||||
builder.updateTaskStatus(pickRandomTask(builder), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
|
||||
builder.updateTaskState(pickRandomTask(builder), randomBoolean() ? new State(randomAlphaOfLength(10)) : null);
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
|
@ -155,9 +154,10 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
@Override
|
||||
protected NamedXContentRegistry xContentRegistry() {
|
||||
return new NamedXContentRegistry(Arrays.asList(
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
|
||||
TestParams::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent)
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class,
|
||||
new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent),
|
||||
new NamedXContentRegistry.Entry(PersistentTaskState.class,
|
||||
new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent)
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -186,7 +186,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
// Things that should be serialized
|
||||
assertEquals(testTask.getTaskName(), newTask.getTaskName());
|
||||
assertEquals(testTask.getId(), newTask.getId());
|
||||
assertEquals(testTask.getStatus(), newTask.getStatus());
|
||||
assertEquals(testTask.getState(), newTask.getState());
|
||||
assertEquals(testTask.getParams(), newTask.getParams());
|
||||
|
||||
// Things that shouldn't be serialized
|
||||
|
@ -224,10 +224,10 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
|
|||
case 2:
|
||||
if (builder.hasTask(lastKnownTask)) {
|
||||
changed = true;
|
||||
builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
|
||||
builder.updateTaskState(lastKnownTask, randomBoolean() ? new State(randomAlphaOfLength(10)) : null);
|
||||
} else {
|
||||
String fLastKnownTask = lastKnownTask;
|
||||
expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskStatus(fLastKnownTask, null));
|
||||
expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null));
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -64,7 +63,7 @@ public abstract class PersistentTasksDecidersTestCase extends ESTestCase {
|
|||
public <Params extends PersistentTaskParams> PersistentTasksExecutor<Params> getPersistentTaskExecutorSafe(String taskName) {
|
||||
return new PersistentTasksExecutor<Params>(clusterService.getSettings(), taskName, null) {
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, Params params, Task.Status status) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {
|
||||
logger.debug("Executing task {}", task);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.tasks.TaskInfo;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
|
||||
|
@ -190,11 +190,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()
|
||||
.custom(PersistentTasksCustomMetaData.TYPE);
|
||||
assertThat(tasksInProgress.tasks().size(), equalTo(1));
|
||||
assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue());
|
||||
assertThat(tasksInProgress.tasks().iterator().next().getState(), nullValue());
|
||||
|
||||
int numberOfUpdates = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numberOfUpdates; i++) {
|
||||
logger.info("Updating the task status");
|
||||
logger.info("Updating the task states");
|
||||
// Complete the running task and make sure it finishes properly
|
||||
assertThat(new TestTasksRequestBuilder(client()).setOperation("update_status").setTaskId(firstRunningTask.getTaskId())
|
||||
.get().getTasks().size(), equalTo(1));
|
||||
|
@ -202,8 +202,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
int finalI = i;
|
||||
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
|
||||
persistentTasksService.waitForPersistentTaskCondition(taskId,
|
||||
task -> task != null && task.getStatus() != null && task.getStatus().toString() != null &&
|
||||
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
|
||||
task -> task != null && task.getState() != null && task.getState().toString() != null &&
|
||||
task.getState().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
|
||||
TimeValue.timeValueSeconds(10), future1);
|
||||
assertThat(future1.get().getId(), equalTo(taskId));
|
||||
}
|
||||
|
@ -215,7 +215,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
|
|||
assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
|
||||
|
||||
PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
|
||||
persistentTasksService.updateStatus(taskId, -2, new Status("should fail"), failedUpdateFuture);
|
||||
persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), failedUpdateFuture);
|
||||
assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId +
|
||||
" and allocation id -2 doesn't exist");
|
||||
|
||||
|
|
|
@ -210,13 +210,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
|
||||
ClusterState state = createInitialClusterState(1, Settings.EMPTY);
|
||||
|
||||
Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase");
|
||||
PersistentTaskState taskState = new TestPersistentTasksPlugin.State("_test_phase");
|
||||
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
|
||||
String taskId = UUIDs.base64UUID();
|
||||
TestParams taskParams = new TestParams("other_0");
|
||||
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams,
|
||||
new Assignment("this_node", "test assignment on other node"));
|
||||
tasks.updateTaskStatus(taskId, status);
|
||||
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, new Assignment("this_node", "test assignment on other node"));
|
||||
tasks.updateTaskState(taskId, taskState);
|
||||
MetaData.Builder metaData = MetaData.builder(state.metaData());
|
||||
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
|
||||
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
|
||||
|
@ -225,7 +224,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
|
||||
assertThat(executor.size(), equalTo(1));
|
||||
assertThat(executor.get(0).params, sameInstance(taskParams));
|
||||
assertThat(executor.get(0).status, sameInstance(status));
|
||||
assertThat(executor.get(0).state, sameInstance(taskState));
|
||||
assertThat(executor.get(0).task, sameInstance(nodeTask));
|
||||
}
|
||||
|
||||
|
@ -331,15 +330,16 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private class Execution {
|
||||
|
||||
private final PersistentTaskParams params;
|
||||
private final AllocatedPersistentTask task;
|
||||
private final Task.Status status;
|
||||
private final PersistentTaskState state;
|
||||
private final PersistentTasksExecutor<?> holder;
|
||||
|
||||
Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor<?> holder) {
|
||||
Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTaskState state, PersistentTasksExecutor<?> holder) {
|
||||
this.params = params;
|
||||
this.task = task;
|
||||
this.status = status;
|
||||
this.state = state;
|
||||
this.holder = holder;
|
||||
}
|
||||
}
|
||||
|
@ -352,11 +352,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public <Params extends PersistentTaskParams> void executeTask(Params params,
|
||||
Task.Status status,
|
||||
AllocatedPersistentTask task,
|
||||
PersistentTasksExecutor<Params> executor) {
|
||||
executions.add(new Execution(params, task, status, executor));
|
||||
public <Params extends PersistentTaskParams> void executeTask(final Params params,
|
||||
final PersistentTaskState state,
|
||||
final AllocatedPersistentTask task,
|
||||
final PersistentTasksExecutor<Params> executor) {
|
||||
executions.add(new Execution(params, task, state, executor));
|
||||
}
|
||||
|
||||
public Execution get(int i) {
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask
|
|||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.PersistentTaskPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskCancelledException;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -100,16 +99,17 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
|
||||
return Arrays.asList(
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NamedXContentRegistry.Entry> getNamedXContent() {
|
||||
return Arrays.asList(
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
|
||||
TestParams::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent)
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class,
|
||||
new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent),
|
||||
new NamedXContentRegistry.Entry(PersistentTaskState.class,
|
||||
new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -221,22 +221,22 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
}
|
||||
}
|
||||
|
||||
public static class Status implements Task.Status {
|
||||
public static class State implements PersistentTaskState {
|
||||
|
||||
private final String phase;
|
||||
|
||||
public static final ConstructingObjectParser<Status, Void> STATUS_PARSER =
|
||||
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new Status((String) args[0]));
|
||||
public static final ConstructingObjectParser<State, Void> STATE_PARSER =
|
||||
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new State((String) args[0]));
|
||||
|
||||
static {
|
||||
STATUS_PARSER.declareString(constructorArg(), new ParseField("phase"));
|
||||
STATE_PARSER.declareString(constructorArg(), new ParseField("phase"));
|
||||
}
|
||||
|
||||
public Status(String phase) {
|
||||
public State(String phase) {
|
||||
this.phase = requireNonNull(phase, "Phase cannot be null");
|
||||
}
|
||||
|
||||
public Status(StreamInput in) throws IOException {
|
||||
public State(StreamInput in) throws IOException {
|
||||
phase = in.readString();
|
||||
}
|
||||
|
||||
|
@ -253,11 +253,10 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
return builder;
|
||||
}
|
||||
|
||||
public static Task.Status fromXContent(XContentParser parser) throws IOException {
|
||||
return STATUS_PARSER.parse(parser, null);
|
||||
public static PersistentTaskState fromXContent(XContentParser parser) throws IOException {
|
||||
return STATE_PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return false;
|
||||
|
@ -276,10 +275,10 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
// Implements equals and hashcode for testing
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null || obj.getClass() != Status.class) {
|
||||
if (obj == null || obj.getClass() != State.class) {
|
||||
return false;
|
||||
}
|
||||
Status other = (Status) obj;
|
||||
State other = (State) obj;
|
||||
return phase.equals(other.phase);
|
||||
}
|
||||
|
||||
|
@ -289,7 +288,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public static class TestPersistentTasksExecutor extends PersistentTasksExecutor<TestParams> {
|
||||
|
||||
public static final String NAME = "cluster:admin/persistent/test";
|
||||
|
@ -317,7 +315,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, TestParams params, PersistentTaskState state) {
|
||||
logger.info("started node operation for the task {}", task);
|
||||
try {
|
||||
TestTask testTask = (TestTask) task;
|
||||
|
@ -340,9 +338,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
} else if ("update_status".equals(testTask.getOperation())) {
|
||||
testTask.setOperation(null);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Status newStatus = new Status("phase " + phase.incrementAndGet());
|
||||
logger.info("updating the task status to {}", newStatus);
|
||||
task.updatePersistentStatus(newStatus, new ActionListener<PersistentTask<?>>() {
|
||||
State newState = new State("phase " + phase.incrementAndGet());
|
||||
logger.info("updating the task state to {}", newState);
|
||||
task.updatePersistentTaskState(newState, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.info("updating was successful");
|
||||
|
@ -540,5 +538,4 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
|
|||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -20,9 +20,8 @@ package org.elasticsearch.persistent;
|
|||
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.AbstractStreamableTestCase;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
|
||||
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction.Request;
|
||||
|
||||
|
@ -32,7 +31,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
|
|||
|
||||
@Override
|
||||
protected Request createTestInstance() {
|
||||
return new Request(UUIDs.base64UUID(), randomLong(), new Status(randomAlphaOfLength(10)));
|
||||
return new Request(UUIDs.base64UUID(), randomLong(), new State(randomAlphaOfLength(10)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -43,7 +42,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
|
|||
@Override
|
||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||
return new NamedWriteableRegistry(Collections.singletonList(
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.license.PostStartBasicAction;
|
|||
import org.elasticsearch.license.PostStartTrialAction;
|
||||
import org.elasticsearch.license.PutLicenseAction;
|
||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.NetworkPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -89,7 +90,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
|
|||
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
|
||||
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage;
|
||||
import org.elasticsearch.xpack.core.rollup.RollupField;
|
||||
|
@ -325,9 +326,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
StartDatafeedAction.DatafeedParams::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
|
||||
OpenJobAction.JobParams::new),
|
||||
// ML - Task statuses
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream),
|
||||
// ML - Task states
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream),
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.MACHINE_LEARNING,
|
||||
MachineLearningFeatureSetUsage::new),
|
||||
// monitoring
|
||||
|
@ -350,7 +351,8 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
// rollup
|
||||
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ROLLUP, RollupFeatureSetUsage::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new),
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new)
|
||||
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new),
|
||||
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -365,9 +367,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
StartDatafeedAction.DatafeedParams::fromXContent),
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
|
||||
OpenJobAction.JobParams::fromXContent),
|
||||
// ML - Task statuses
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobTaskStatus.NAME), JobTaskStatus::fromXContent),
|
||||
// ML - Task states
|
||||
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
|
||||
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(JobTaskState.NAME), JobTaskState::fromXContent),
|
||||
// watcher
|
||||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(WatcherMetaData.TYPE),
|
||||
WatcherMetaData::fromXContent),
|
||||
|
@ -375,8 +377,12 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
|
|||
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(LicensesMetaData.TYPE),
|
||||
LicensesMetaData::fromXContent),
|
||||
//rollup
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME), RollupJob::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), RollupJobStatus::fromXContent)
|
||||
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME),
|
||||
RollupJob::fromXContent),
|
||||
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME),
|
||||
RollupJobStatus::fromXContent),
|
||||
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(RollupJobStatus.NAME),
|
||||
RollupJobStatus::fromXContent)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
|||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
@ -402,9 +402,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||
if (allowDeleteOpenJob == false) {
|
||||
PersistentTask<?> jobTask = getJobTask(jobId, tasks);
|
||||
if (jobTask != null) {
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus();
|
||||
JobTaskState jobTaskState = (JobTaskState) jobTask.getState();
|
||||
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
|
||||
+ ((jobTaskStatus == null) ? JobState.OPENING : jobTaskStatus.getState()));
|
||||
+ ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState()));
|
||||
}
|
||||
}
|
||||
Job.Builder jobBuilder = new Job.Builder(job);
|
||||
|
@ -448,7 +448,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
|
||||
PersistentTask<?> task = getJobTask(jobId, tasks);
|
||||
if (task != null) {
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
|
||||
JobTaskState jobTaskState = (JobTaskState) task.getState();
|
||||
if (jobTaskState == null) {
|
||||
return JobState.OPENING;
|
||||
}
|
||||
|
@ -460,8 +460,8 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
|
|||
|
||||
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
|
||||
PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
|
||||
if (task != null && task.getStatus() != null) {
|
||||
return (DatafeedState) task.getStatus();
|
||||
if (task != null && task.getState() != null) {
|
||||
return (DatafeedState) task.getState();
|
||||
} else {
|
||||
// If we haven't started a datafeed then there will be no persistent task,
|
||||
// which is the same as if the datafeed was't started
|
||||
|
|
|
@ -12,7 +12,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -20,7 +20,7 @@ import java.util.Locale;
|
|||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
public enum DatafeedState implements Task.Status {
|
||||
public enum DatafeedState implements PersistentTaskState {
|
||||
|
||||
STARTED, STOPPED, STARTING, STOPPING;
|
||||
|
||||
|
|
|
@ -12,25 +12,25 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
|
||||
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
|
||||
public class JobTaskStatus implements Task.Status {
|
||||
public class JobTaskState implements PersistentTaskState {
|
||||
|
||||
public static final String NAME = OpenJobAction.TASK_NAME;
|
||||
|
||||
private static ParseField STATE = new ParseField("state");
|
||||
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
|
||||
|
||||
private static final ConstructingObjectParser<JobTaskStatus, Void> PARSER =
|
||||
private static final ConstructingObjectParser<JobTaskState, Void> PARSER =
|
||||
new ConstructingObjectParser<>(NAME,
|
||||
args -> new JobTaskStatus((JobState) args[0], (Long) args[1]));
|
||||
args -> new JobTaskState((JobState) args[0], (Long) args[1]));
|
||||
|
||||
static {
|
||||
PARSER.declareField(constructorArg(), p -> {
|
||||
|
@ -42,7 +42,7 @@ public class JobTaskStatus implements Task.Status {
|
|||
PARSER.declareLong(constructorArg(), ALLOCATION_ID);
|
||||
}
|
||||
|
||||
public static JobTaskStatus fromXContent(XContentParser parser) {
|
||||
public static JobTaskState fromXContent(XContentParser parser) {
|
||||
try {
|
||||
return PARSER.parse(parser, null);
|
||||
} catch (IOException e) {
|
||||
|
@ -53,12 +53,12 @@ public class JobTaskStatus implements Task.Status {
|
|||
private final JobState state;
|
||||
private final long allocationId;
|
||||
|
||||
public JobTaskStatus(JobState state, long allocationId) {
|
||||
public JobTaskState(JobState state, long allocationId) {
|
||||
this.state = Objects.requireNonNull(state);
|
||||
this.allocationId = allocationId;
|
||||
}
|
||||
|
||||
public JobTaskStatus(StreamInput in) throws IOException {
|
||||
public JobTaskState(StreamInput in) throws IOException {
|
||||
state = JobState.fromStream(in);
|
||||
allocationId = in.readLong();
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ public class JobTaskStatus implements Task.Status {
|
|||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
JobTaskStatus that = (JobTaskStatus) o;
|
||||
JobTaskState that = (JobTaskState) o;
|
||||
return state == that.state &&
|
||||
Objects.equals(allocationId, that.allocationId);
|
||||
}
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -30,7 +31,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
|
|||
* indexer's current position. When the allocated task updates its status,
|
||||
* it is providing a new version of this.
|
||||
*/
|
||||
public class RollupJobStatus implements Task.Status {
|
||||
public class RollupJobStatus implements Task.Status, PersistentTaskState {
|
||||
public static final String NAME = "xpack/rollup/job";
|
||||
|
||||
private final IndexerState state;
|
||||
|
@ -73,7 +74,7 @@ public class RollupJobStatus implements Task.Status {
|
|||
currentPosition = in.readBoolean() ? new TreeMap<>(in.readMap()) : null;
|
||||
}
|
||||
|
||||
public IndexerState getState() {
|
||||
public IndexerState getIndexerState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
|||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
@ -256,8 +256,8 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
|
|||
@Override
|
||||
protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAction.JobTask jobTask,
|
||||
ActionListener<CloseJobAction.Response> listener) {
|
||||
JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> {
|
||||
JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> {
|
||||
// we need to fork because we are now on a network threadpool and closeJob method may take a while to complete:
|
||||
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
|
|
|
@ -39,12 +39,12 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.plugins.MapperPlugin;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -57,7 +57,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
|
||||
|
@ -208,7 +208,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
persistentTasks.findTasks(OpenJobAction.TASK_NAME,
|
||||
task -> node.getId().equals(task.getExecutorNode()));
|
||||
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus();
|
||||
JobTaskState jobTaskState = (JobTaskState) assignedTask.getState();
|
||||
JobState jobState;
|
||||
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
|
||||
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING
|
||||
|
@ -675,14 +675,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, Task.Status status) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, PersistentTaskState state) {
|
||||
JobTask jobTask = (JobTask) task;
|
||||
jobTask.autodetectProcessManager = autodetectProcessManager;
|
||||
JobTaskStatus jobStateStatus = (JobTaskStatus) status;
|
||||
JobTaskState jobTaskState = (JobTaskState) state;
|
||||
// If the job is failed then the Persistent Task Service will
|
||||
// try to restart it on a node restart. Exiting here leaves the
|
||||
// job in the failed state and it must be force closed.
|
||||
if (jobStateStatus != null && jobStateStatus.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
|
||||
if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -766,8 +766,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
|
|||
public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
|
||||
JobState jobState = JobState.CLOSED;
|
||||
if (persistentTask != null) {
|
||||
JobTaskStatus jobStateStatus = (JobTaskStatus) persistentTask.getStatus();
|
||||
jobState = jobStateStatus == null ? JobState.OPENING : jobStateStatus.getState();
|
||||
JobTaskState jobTaskState = (JobTaskState) persistentTask.getState();
|
||||
jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState();
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
|
||||
// This logic is only appropriate when opening a job, not when reallocating following a failure,
|
||||
|
|
|
@ -23,8 +23,8 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -274,8 +274,9 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, StartDatafeedAction.DatafeedParams params,
|
||||
Task.Status status) {
|
||||
protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTask,
|
||||
final StartDatafeedAction.DatafeedParams params,
|
||||
final PersistentTaskState state) {
|
||||
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
|
||||
datafeedTask.datafeedManager = datafeedManager;
|
||||
datafeedManager.run(datafeedTask,
|
||||
|
@ -373,7 +374,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
|
|||
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
|
||||
return true;
|
||||
}
|
||||
DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus();
|
||||
DatafeedState datafeedState = (DatafeedState) persistentTask.getState();
|
||||
return datafeedState == DatafeedState.STARTED;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -222,10 +222,10 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTaskTask,
|
||||
protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTask,
|
||||
ActionListener<StopDatafeedAction.Response> listener) {
|
||||
DatafeedState taskStatus = DatafeedState.STOPPING;
|
||||
datafeedTaskTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> {
|
||||
DatafeedState taskState = DatafeedState.STOPPING;
|
||||
datafeedTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> {
|
||||
// we need to fork because we are now on a network threadpool
|
||||
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
|
@ -235,7 +235,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
datafeedTaskTask.stop("stop_datafeed (api)", request.getStopTimeout());
|
||||
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
|
||||
listener.onResponse(new StopDatafeedAction.Response(true));
|
||||
}
|
||||
});
|
||||
|
|
|
@ -88,7 +88,7 @@ public class DatafeedManager extends AbstractComponent {
|
|||
datafeedJob -> {
|
||||
Holder holder = new Holder(task, datafeed, datafeedJob, new ProblemTracker(auditor, job.getId()), taskHandler);
|
||||
runningDatafeedsOnThisNode.put(task.getAllocationId(), holder);
|
||||
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
|
||||
task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
taskRunner.runWhenJobIsOpened(task);
|
||||
|
|
|
@ -12,12 +12,12 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -64,11 +64,11 @@ public class DatafeedNodeSelector {
|
|||
PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector();
|
||||
priorityFailureCollector.add(verifyIndicesActive(datafeed));
|
||||
|
||||
JobTaskStatus taskStatus = null;
|
||||
JobTaskState jobTaskState = null;
|
||||
JobState jobState = JobState.CLOSED;
|
||||
if (jobTask != null) {
|
||||
taskStatus = (JobTaskStatus) jobTask.getStatus();
|
||||
jobState = taskStatus == null ? JobState.OPENING : taskStatus.getState();
|
||||
jobTaskState = (JobTaskState) jobTask.getState();
|
||||
jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState();
|
||||
}
|
||||
|
||||
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
|
||||
|
@ -78,8 +78,8 @@ public class DatafeedNodeSelector {
|
|||
priorityFailureCollector.add(new AssignmentFailure(reason, true));
|
||||
}
|
||||
|
||||
if (taskStatus != null && taskStatus.isStatusStale(jobTask)) {
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale";
|
||||
if (jobTaskState != null && jobTaskState.isStatusStale(jobTask)) {
|
||||
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] state is stale";
|
||||
priorityFailureCollector.add(new AssignmentFailure(reason, true));
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
|
|||
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
|
||||
|
@ -623,8 +623,8 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
void setJobState(JobTask jobTask, JobState state) {
|
||||
JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() {
|
||||
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId());
|
||||
|
@ -638,8 +638,8 @@ public class AutodetectProcessManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
void setJobState(JobTask jobTask, JobState state, CheckedConsumer<Exception, IOException> handler) {
|
||||
JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() {
|
||||
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
|
||||
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTask<?> persistentTask) {
|
||||
try {
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
|
||||
|
@ -363,7 +363,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
|
|||
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
|
||||
assertEquals(JobState.OPENING, MlMetadata.getJobState("foo", tasksBuilder.build()));
|
||||
|
||||
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("foo"), new JobTaskStatus(JobState.OPENED, tasksBuilder.getLastAllocationId()));
|
||||
tasksBuilder.updateTaskState(MlMetadata.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId()));
|
||||
assertEquals(JobState.OPENED, MlMetadata.getJobState("foo", tasksBuilder.build()));
|
||||
}
|
||||
|
||||
|
|
|
@ -314,7 +314,7 @@ public class TransportCloseJobActionTests extends ESTestCase {
|
|||
PersistentTasksCustomMetaData.Builder tasks) {
|
||||
tasks.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME,
|
||||
new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new Assignment(nodeId, "test assignment"));
|
||||
tasks.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state);
|
||||
tasks.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Operator;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
|
@ -329,7 +329,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
|
||||
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("job_id6"), null);
|
||||
tasksBuilder.updateTaskState(MlMetadata.jobTaskId("job_id6"), null);
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
csBuilder = ClusterState.builder(cs);
|
||||
|
@ -630,7 +630,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
|
|||
builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId),
|
||||
new Assignment(nodeId, "test assignment"));
|
||||
if (jobState != null) {
|
||||
builder.updateTaskStatus(MlMetadata.jobTaskId(jobId), new JobTaskStatus(jobState, builder.getLastAllocationId()));
|
||||
builder.updateTaskState(MlMetadata.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ public class TransportStopDatafeedActionTests extends ESTestCase {
|
|||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
tasksBuilder.addTask(MLMetadataField.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME,
|
||||
new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
|
||||
tasksBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED);
|
||||
tasksBuilder.updateTaskState(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED);
|
||||
tasksBuilder.build();
|
||||
|
||||
Job job = createDatafeedJob().build(new Date());
|
||||
|
@ -121,6 +121,6 @@ public class TransportStopDatafeedActionTests extends ESTestCase {
|
|||
taskBuilder.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME,
|
||||
new StartDatafeedAction.DatafeedParams(datafeedId, startTime),
|
||||
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
|
||||
taskBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state);
|
||||
taskBuilder.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -378,7 +378,7 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(mock(PersistentTask.class));
|
||||
return null;
|
||||
}).when(task).updatePersistentStatus(any(), any());
|
||||
}).when(task).updatePersistentTaskState(any(), any());
|
||||
return task;
|
||||
}
|
||||
|
||||
|
@ -394,7 +394,7 @@ public class DatafeedManagerTests extends ESTestCase {
|
|||
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
|
||||
listener.onResponse(mock(PersistentTask.class));
|
||||
return null;
|
||||
}).when(task).updatePersistentStatus(any(), any());
|
||||
}).when(task).updatePersistentTaskState(any(), any());
|
||||
return task;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.MLMetadataField;
|
|||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -255,20 +255,20 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
|
|||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder);
|
||||
// Set to lower allocationId, so job task is stale:
|
||||
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0));
|
||||
tasksBuilder.updateTaskState(MlMetadata.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0));
|
||||
tasks = tasksBuilder.build();
|
||||
|
||||
givenClusterState("foo", 1, 0);
|
||||
|
||||
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
|
||||
assertNull(result.getExecutorNode());
|
||||
assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale",
|
||||
assertEquals("cannot start datafeed [datafeed_id], job [job_id] state is stale",
|
||||
result.getExplanation());
|
||||
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated());
|
||||
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
|
||||
+ "[cannot start datafeed [datafeed_id], job [job_id] status is stale]"));
|
||||
+ "[cannot start datafeed [datafeed_id], job [job_id] state is stale]"));
|
||||
|
||||
tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||
addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder);
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
|
||||
|
||||
|
@ -211,9 +211,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
|
||||
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
|
||||
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(JobState.OPENED, jobTaskStatus.getState());
|
||||
JobTaskState jobTaskState = (JobTaskState) task.getState();
|
||||
assertNotNull(jobTaskState);
|
||||
assertEquals(JobState.OPENED, jobTaskState.getState());
|
||||
});
|
||||
|
||||
logger.info("stop the only running ml node");
|
||||
|
@ -264,7 +264,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
|
||||
for (DiscoveryNode node : event.state().nodes()) {
|
||||
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> {
|
||||
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
|
||||
JobTaskState jobTaskState = (JobTaskState) task.getState();
|
||||
return node.getId().equals(task.getExecutorNode()) &&
|
||||
(jobTaskState == null || jobTaskState.isStatusStale(task));
|
||||
});
|
||||
|
@ -396,9 +396,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
|
||||
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
|
||||
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(expectedState, jobTaskStatus.getState());
|
||||
JobTaskState jobTaskState = (JobTaskState) task.getState();
|
||||
assertNotNull(jobTaskState);
|
||||
assertEquals(expectedState, jobTaskState.getState());
|
||||
} else {
|
||||
assertNull(task.getExecutorNode());
|
||||
}
|
||||
|
@ -411,9 +411,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
|
|||
assertEquals(numJobs, tasks.taskMap().size());
|
||||
for (PersistentTask<?> task : tasks.taskMap().values()) {
|
||||
assertNotNull(task.getExecutorNode());
|
||||
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
|
||||
assertNotNull(jobTaskStatus);
|
||||
assertEquals(JobState.OPENED, jobTaskStatus.getState());
|
||||
JobTaskState jobTaskState = (JobTaskState) task.getState();
|
||||
assertNotNull(jobTaskState);
|
||||
assertEquals(JobState.OPENED, jobTaskState.getState());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
|||
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||
|
@ -58,7 +58,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
|
|||
assertEquals(1, tasks.taskMap().size());
|
||||
// now just double check that the first job is still opened:
|
||||
PersistentTasksCustomMetaData.PersistentTask task = tasks.getTask(MlMetadata.jobTaskId("close-failed-job-1"));
|
||||
assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState());
|
||||
assertEquals(JobState.OPENED, ((JobTaskState) task.getState()).getState());
|
||||
}
|
||||
|
||||
public void testSingleNode() throws Exception {
|
||||
|
|
|
@ -9,22 +9,22 @@ import org.elasticsearch.common.io.stream.Writeable;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
|
||||
public class JobTaskStatusTests extends AbstractSerializingTestCase<JobTaskStatus> {
|
||||
public class JobTaskStateTests extends AbstractSerializingTestCase<JobTaskState> {
|
||||
|
||||
@Override
|
||||
protected JobTaskStatus createTestInstance() {
|
||||
return new JobTaskStatus(randomFrom(JobState.values()), randomLong());
|
||||
protected JobTaskState createTestInstance() {
|
||||
return new JobTaskState(randomFrom(JobState.values()), randomLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<JobTaskStatus> instanceReader() {
|
||||
return JobTaskStatus::new;
|
||||
protected Writeable.Reader<JobTaskState> instanceReader() {
|
||||
return JobTaskState::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JobTaskStatus doParseInstance(XContentParser parser) {
|
||||
return JobTaskStatus.fromXContent(parser);
|
||||
protected JobTaskState doParseInstance(XContentParser parser) {
|
||||
return JobTaskState.fromXContent(parser);
|
||||
}
|
||||
}
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
|
|||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
|
||||
|
@ -199,7 +199,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
manager.openJob(jobTask, e -> {});
|
||||
assertEquals(1, manager.numberOfOpenJobs());
|
||||
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
|
||||
verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any());
|
||||
verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any());
|
||||
}
|
||||
|
||||
public void testOpenJob_exceedMaxNumJobs() {
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.client.ParentTaskAssigningClient;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
|
@ -62,7 +63,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, Status status) {
|
||||
protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, PersistentTaskState state) {
|
||||
RollupJobTask rollupJobTask = (RollupJobTask) task;
|
||||
SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(SCHEDULE_NAME + "_" + params.getConfig().getId(),
|
||||
new CronSchedule(params.getConfig().getCron()));
|
||||
|
@ -80,7 +81,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
PersistentTasksCustomMetaData.PersistentTask<RollupJob> persistentTask,
|
||||
Map<String, String> headers) {
|
||||
return new RollupJobTask(id, type, action, parentTaskId, persistentTask.getParams(),
|
||||
(RollupJobStatus) persistentTask.getStatus(), client, schedulerEngine, threadPool, headers);
|
||||
(RollupJobStatus) persistentTask.getState(), client, schedulerEngine, threadPool, headers);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,15 +116,15 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
|
||||
if (state.equals(IndexerState.ABORTING)) {
|
||||
protected void doSaveState(IndexerState indexerState, Map<String, Object> position, Runnable next) {
|
||||
if (indexerState.equals(IndexerState.ABORTING)) {
|
||||
// If we're aborting, just invoke `next` (which is likely an onFailure handler)
|
||||
next.run();
|
||||
} else {
|
||||
// Otherwise, attempt to persist our state
|
||||
final RollupJobStatus status = new RollupJobStatus(state, getPosition());
|
||||
logger.debug("Updating persistent status of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]");
|
||||
updatePersistentStatus(status, ActionListener.wrap(task -> next.run(), exc -> next.run()));
|
||||
final RollupJobStatus state = new RollupJobStatus(indexerState, getPosition());
|
||||
logger.debug("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + indexerState.toString() + "]");
|
||||
updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> next.run()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,7 +149,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
private final ThreadPool threadPool;
|
||||
private final RollupIndexer indexer;
|
||||
|
||||
RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus status,
|
||||
RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
|
||||
Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
|
||||
super(id, type, action, RollupField.NAME + "_" + job.getConfig().getId(), parentTask, headers);
|
||||
this.job = job;
|
||||
|
@ -158,16 +159,17 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
// If status is not null, we are resuming rather than starting fresh.
|
||||
Map<String, Object> initialPosition = null;
|
||||
IndexerState initialState = IndexerState.STOPPED;
|
||||
if (status != null) {
|
||||
logger.debug("We have existing status, setting state to [" + status.getState() + "] " +
|
||||
"and current position to [" + status.getPosition() + "] for job [" + job.getConfig().getId() + "]");
|
||||
if (status.getState().equals(IndexerState.INDEXING)) {
|
||||
if (state != null) {
|
||||
final IndexerState existingState = state.getIndexerState();
|
||||
logger.debug("We have existing state, setting state to [" + existingState + "] " +
|
||||
"and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]");
|
||||
if (existingState.equals(IndexerState.INDEXING)) {
|
||||
/*
|
||||
* If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
|
||||
* it is indexing but without the actual indexing thread running.
|
||||
*/
|
||||
initialState = IndexerState.STARTED;
|
||||
} else if (status.getState().equals(IndexerState.ABORTING) || status.getState().equals(IndexerState.STOPPING)) {
|
||||
} else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) {
|
||||
// It shouldn't be possible to persist ABORTING, but if for some reason it does,
|
||||
// play it safe and restore the job as STOPPED. An admin will have to clean it up,
|
||||
// but it won't be running, and won't delete itself either. Safest option.
|
||||
|
@ -175,9 +177,9 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
// to restore as STOPEPD
|
||||
initialState = IndexerState.STOPPED;
|
||||
} else {
|
||||
initialState = status.getState();
|
||||
initialState = existingState;
|
||||
}
|
||||
initialPosition = status.getPosition();
|
||||
initialPosition = state.getPosition();
|
||||
}
|
||||
this.indexer = new ClientRollupPageManager(job, initialState, initialPosition,
|
||||
new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())));
|
||||
|
@ -227,20 +229,20 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
+ " state was [" + newState + "]"));
|
||||
return;
|
||||
}
|
||||
final RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition());
|
||||
logger.debug("Updating status for rollup job [" + job.getConfig().getId() + "] to [" + status.getState() + "][" +
|
||||
status.getPosition() + "]");
|
||||
updatePersistentStatus(status,
|
||||
final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition());
|
||||
logger.debug("Updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "][" +
|
||||
state.getPosition() + "]");
|
||||
updatePersistentTaskState(state,
|
||||
ActionListener.wrap(
|
||||
(task) -> {
|
||||
logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId() + "] to ["
|
||||
+ status.getState() + "][" + status.getPosition() + "]");
|
||||
logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId() + "] to ["
|
||||
+ state.getIndexerState() + "][" + state.getPosition() + "]");
|
||||
listener.onResponse(new StartRollupJobAction.Response(true));
|
||||
},
|
||||
(exc) -> {
|
||||
listener.onFailure(
|
||||
new ElasticsearchException("Error while updating status for rollup job [" + job.getConfig().getId()
|
||||
+ "] to [" + status.getState() + "].", exc)
|
||||
new ElasticsearchException("Error while updating state for rollup job [" + job.getConfig().getId()
|
||||
+ "] to [" + state.getIndexerState() + "].", exc)
|
||||
);
|
||||
}
|
||||
)
|
||||
|
@ -268,17 +270,17 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|||
case STOPPING:
|
||||
// update the persistent state only if there is no background job running,
|
||||
// otherwise the state is updated by the indexer when the background job detects the STOPPING state.
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition());
|
||||
updatePersistentStatus(status,
|
||||
RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition());
|
||||
updatePersistentTaskState(state,
|
||||
ActionListener.wrap(
|
||||
(task) -> {
|
||||
logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId()
|
||||
+ "] to [" + status.getState() + "]");
|
||||
logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId()
|
||||
+ "] to [" + state.getIndexerState() + "]");
|
||||
listener.onResponse(new StopRollupJobAction.Response(true));
|
||||
},
|
||||
(exc) -> {
|
||||
listener.onFailure(new ElasticsearchException("Error while updating status for rollup job ["
|
||||
+ job.getConfig().getId() + "] to [" + status.getState() + "].", exc));
|
||||
listener.onFailure(new ElasticsearchException("Error while updating state for rollup job ["
|
||||
+ job.getConfig().getId() + "] to [" + state.getIndexerState() + "].", exc));
|
||||
})
|
||||
);
|
||||
break;
|
||||
|
|
|
@ -7,10 +7,8 @@ package org.elasticsearch.xpack.rollup;
|
|||
|
||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
|
@ -27,7 +25,6 @@ import org.elasticsearch.search.aggregations.Aggregation;
|
|||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.Netty4Plugin;
|
||||
|
@ -104,7 +101,7 @@ public class RollupIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
@Before
|
||||
public void createIndex() throws Exception {
|
||||
public void createIndex() {
|
||||
client().admin().indices().prepareCreate("test-1").addMapping("doc", "{\"doc\": {\"properties\": {" +
|
||||
"\"date_histo\": {\"type\": \"date\"}, " +
|
||||
"\"histo\": {\"type\": \"integer\"}, " +
|
||||
|
@ -125,7 +122,7 @@ public class RollupIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
BulkResponse response = bulk.get();
|
||||
bulk.get();
|
||||
client().admin().indices().prepareRefresh("test-1").get();
|
||||
}
|
||||
|
||||
|
@ -195,27 +192,23 @@ public class RollupIT extends ESIntegTestCase {
|
|||
|
||||
// Make sure it started
|
||||
ESTestCase.assertBusy(() -> {
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern");
|
||||
if (rollupJobStatus == null) {;
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern");
|
||||
if (rollupJobStatus == null) {
|
||||
fail("null");
|
||||
}
|
||||
|
||||
IndexerState state = rollupJobStatus.getState();
|
||||
IndexerState state = rollupJobStatus.getIndexerState();
|
||||
assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING));
|
||||
}, 60, TimeUnit.SECONDS);
|
||||
|
||||
// And wait for it to finish
|
||||
ESTestCase.assertBusy(() -> {
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern");
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern");
|
||||
if (rollupJobStatus == null) {
|
||||
fail("null");
|
||||
}
|
||||
|
||||
IndexerState state = rollupJobStatus.getState();
|
||||
IndexerState state = rollupJobStatus.getIndexerState();
|
||||
assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null);
|
||||
}, 60, TimeUnit.SECONDS);
|
||||
|
||||
|
@ -274,23 +267,20 @@ public class RollupIT extends ESIntegTestCase {
|
|||
|
||||
// Make sure it started
|
||||
ESTestCase.assertBusy(() -> {
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1");
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus("job1");
|
||||
if (rollupJobStatus == null) {
|
||||
fail("null");
|
||||
}
|
||||
|
||||
IndexerState state = rollupJobStatus.getState();
|
||||
IndexerState state = rollupJobStatus.getIndexerState();
|
||||
assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING));
|
||||
}, 60, TimeUnit.SECONDS);
|
||||
|
||||
//but not the other task
|
||||
ESTestCase.assertBusy(() -> {
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2");
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus("job2");
|
||||
|
||||
IndexerState state = rollupJobStatus.getState();
|
||||
IndexerState state = rollupJobStatus.getIndexerState();
|
||||
assertTrue(state.equals(IndexerState.STOPPED));
|
||||
}, 60, TimeUnit.SECONDS);
|
||||
|
||||
|
@ -301,9 +291,7 @@ public class RollupIT extends ESIntegTestCase {
|
|||
|
||||
// Make sure the first job's task is gone
|
||||
ESTestCase.assertBusy(() -> {
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1");
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus("job1");
|
||||
assertTrue(rollupJobStatus == null);
|
||||
}, 60, TimeUnit.SECONDS);
|
||||
|
||||
|
@ -320,10 +308,9 @@ public class RollupIT extends ESIntegTestCase {
|
|||
|
||||
// and still STOPPED
|
||||
ESTestCase.assertBusy(() -> {
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2");
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus("job2");
|
||||
|
||||
IndexerState state = rollupJobStatus.getState();
|
||||
IndexerState state = rollupJobStatus.getIndexerState();
|
||||
assertTrue(state.equals(IndexerState.STOPPED));
|
||||
}, 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
@ -404,19 +391,17 @@ public class RollupIT extends ESIntegTestCase {
|
|||
Assert.assertThat(response.isStarted(), equalTo(true));
|
||||
|
||||
ESTestCase.assertBusy(() -> {
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId);
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId);
|
||||
if (rollupJobStatus == null) {
|
||||
fail("null");
|
||||
}
|
||||
|
||||
IndexerState state = rollupJobStatus.getState();
|
||||
IndexerState state = rollupJobStatus.getIndexerState();
|
||||
logger.error("state: [" + state + "]");
|
||||
assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null);
|
||||
}, 60, TimeUnit.SECONDS);
|
||||
|
||||
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId);
|
||||
RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId);
|
||||
if (rollupJobStatus == null) {
|
||||
Assert.fail("rollup job status should not be null");
|
||||
}
|
||||
|
@ -481,11 +466,13 @@ public class RollupIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private RollupJobStatus getRollupJobStatus(ListTasksResponse tasksResponse, String taskId) {
|
||||
for (TaskInfo task : tasksResponse.getTasks()) {
|
||||
if (task.getDescription().equals("rollup_" + taskId)) {
|
||||
return ((RollupJobStatus) task.getStatus());
|
||||
}
|
||||
private RollupJobStatus getRollupJobStatus(final String taskId) {
|
||||
final GetRollupJobsAction.Request request = new GetRollupJobsAction.Request(taskId);
|
||||
final GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, request).actionGet();
|
||||
|
||||
if (response.getJobs() != null && response.getJobs().isEmpty() == false) {
|
||||
assertThat("Expect 1 rollup job with id " + taskId, response.getJobs().size(), equalTo(1));
|
||||
return response.getJobs().iterator().next().getStatus();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -498,13 +485,13 @@ public class RollupIT extends ESIntegTestCase {
|
|||
for (GetRollupJobsAction.JobWrapper job : response.getJobs()) {
|
||||
StopRollupJobAction.Request stopRequest = new StopRollupJobAction.Request(job.getJob().getId());
|
||||
try {
|
||||
StopRollupJobAction.Response stopResponse = client().execute(StopRollupJobAction.INSTANCE, stopRequest).get();
|
||||
client().execute(StopRollupJobAction.INSTANCE, stopRequest).get();
|
||||
} catch (ElasticsearchException e) {
|
||||
//
|
||||
}
|
||||
|
||||
DeleteRollupJobAction.Request deleteRequest = new DeleteRollupJobAction.Request(job.getJob().getId());
|
||||
DeleteRollupJobAction.Response deleteResponse = client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get();
|
||||
client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
|
||||
|
@ -64,7 +65,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
}
|
||||
|
@ -77,7 +78,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
}
|
||||
|
@ -90,7 +91,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
}
|
||||
|
@ -103,7 +104,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
}
|
||||
|
@ -116,7 +117,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
}
|
||||
|
@ -128,7 +129,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
null, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
||||
}
|
||||
|
||||
|
@ -140,7 +141,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
|
||||
|
@ -172,13 +173,14 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
||||
@Override
|
||||
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(status, instanceOf(RollupJobStatus.class));
|
||||
public void updatePersistentTaskState(PersistentTaskState taskState,
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(taskState, instanceOf(RollupJobStatus.class));
|
||||
int c = counter.get();
|
||||
if (c == 0) {
|
||||
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
} else if (c == 1) {
|
||||
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
} else {
|
||||
fail("Should not have updated persistent statuse > 2 times");
|
||||
}
|
||||
|
@ -187,7 +189,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
counter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -195,7 +197,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
@Override
|
||||
public void onResponse(StartRollupJobAction.Response response) {
|
||||
assertTrue(response.isStarted());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -207,7 +209,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
latch.await(3, TimeUnit.SECONDS);
|
||||
|
||||
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
||||
|
||||
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|
||||
|
@ -248,14 +250,15 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap()) {
|
||||
@Override
|
||||
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(status, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
|
||||
public void updatePersistentTaskState(PersistentTaskState taskState,
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(taskState, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
||||
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
|
||||
|
@ -264,7 +267,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
@Override
|
||||
public void onResponse(StartRollupJobAction.Response response) {
|
||||
assertTrue(response.isStarted());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -285,14 +288,15 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap()) {
|
||||
@Override
|
||||
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(status, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
|
||||
public void updatePersistentTaskState(PersistentTaskState taskState,
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(taskState, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
||||
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
||||
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
||||
|
||||
|
@ -301,7 +305,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
@Override
|
||||
public void onResponse(StartRollupJobAction.Response response) {
|
||||
assertTrue(response.isStarted());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -313,7 +317,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
latch.await(3, TimeUnit.SECONDS);
|
||||
|
||||
task.triggered(new SchedulerEngine.Event("unrelated", 123, 123));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); // Should still be started, not INDEXING
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
}
|
||||
|
||||
public void testTrigger() throws InterruptedException {
|
||||
|
@ -325,14 +329,15 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
||||
@Override
|
||||
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(status, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
|
||||
public void updatePersistentTaskState(PersistentTaskState taskState,
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(taskState, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
||||
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -340,7 +345,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
@Override
|
||||
public void onResponse(StartRollupJobAction.Response response) {
|
||||
assertTrue(response.isStarted());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -352,7 +357,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
latch.await(3, TimeUnit.SECONDS);
|
||||
|
||||
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
||||
}
|
||||
|
||||
|
@ -392,11 +397,12 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
||||
@Override
|
||||
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
public void updatePersistentTaskState(PersistentTaskState taskState,
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
Integer counterValue = counter.getAndIncrement();
|
||||
if (counterValue == 0) {
|
||||
assertThat(status, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(taskState, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
||||
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
||||
} else if (counterValue == 1) {
|
||||
|
@ -405,14 +411,14 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
||||
|
||||
task.start(new ActionListener<StartRollupJobAction.Response>() {
|
||||
@Override
|
||||
public void onResponse(StartRollupJobAction.Response response) {
|
||||
assertTrue(response.isStarted());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
started.set(true);
|
||||
}
|
||||
|
||||
|
@ -424,7 +430,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
ESTestCase.awaitBusy(started::get);
|
||||
|
||||
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
||||
// Allow search response to return now
|
||||
latch.countDown();
|
||||
|
@ -475,11 +481,12 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
||||
@Override
|
||||
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
public void updatePersistentTaskState(PersistentTaskState taskState,
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
Integer counterValue = counter.getAndIncrement();
|
||||
if (counterValue == 0) {
|
||||
assertThat(status, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(taskState, instanceOf(RollupJobStatus.class));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
||||
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
||||
} else if (counterValue == 1) {
|
||||
|
@ -488,14 +495,14 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
||||
|
||||
task.start(new ActionListener<StartRollupJobAction.Response>() {
|
||||
@Override
|
||||
public void onResponse(StartRollupJobAction.Response response) {
|
||||
assertTrue(response.isStarted());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
started.set(true);
|
||||
}
|
||||
|
||||
|
@ -507,7 +514,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
ESTestCase.awaitBusy(started::get);
|
||||
|
||||
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
||||
// Allow search response to return now
|
||||
latch.countDown();
|
||||
|
@ -524,7 +531,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|
||||
|
@ -553,15 +560,16 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
||||
@Override
|
||||
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(status, instanceOf(RollupJobStatus.class));
|
||||
public void updatePersistentTaskState(PersistentTaskState taskState,
|
||||
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
||||
assertThat(taskState, instanceOf(RollupJobStatus.class));
|
||||
int c = counter.get();
|
||||
if (c == 0) {
|
||||
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
} else if (c == 1) {
|
||||
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
} else if (c == 2) {
|
||||
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
} else {
|
||||
fail("Should not have updated persistent statuse > 3 times");
|
||||
}
|
||||
|
@ -571,7 +579,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -579,7 +587,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
@Override
|
||||
public void onResponse(StartRollupJobAction.Response response) {
|
||||
assertTrue(response.isStarted());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -591,7 +599,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
latch.await(3, TimeUnit.SECONDS);
|
||||
|
||||
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
||||
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
||||
|
||||
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|
||||
|
@ -642,7 +650,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|||
latch.countDown();
|
||||
}
|
||||
};
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
|
||||
task.onCancelled();
|
||||
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|
||||
|
|
|
@ -26,13 +26,13 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.SecuritySettingsSourceField;
|
||||
import org.elasticsearch.transport.Netty4Plugin;
|
||||
|
@ -70,7 +70,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
|||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
|
@ -449,8 +449,8 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
|
|||
StartDatafeedAction.DatafeedParams::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
|
||||
OpenJobAction.JobParams::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream));
|
||||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
|
||||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
|
||||
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
|
||||
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
|
||||
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
|
||||
|
|
Loading…
Reference in New Issue