Uncouple persistent task state and status (#31031)

This pull request removes the relationship between the state 
of persistent task (as stored in the cluster state) and the status 
of the task (as reported by the Task APIs and used in various 
places) that have been confusing for some time (#29608).

In order to do that, a new PersistentTaskState interface is added. 
This interface represents the persisted state of a persistent task. 
The methods used to update the state of persistent tasks are 
renamed: updatePersistentStatus() becomes updatePersistentTaskState() 
and now takes a PersistentTaskState as a parameter. The 
Task.Status type as been changed to PersistentTaskState in all 
places were it make sense (in persistent task customs in cluster 
state and all other methods that deal with the state of an allocated 
persistent task).
This commit is contained in:
Tanguy Leroux 2018-06-15 09:26:47 +02:00 committed by GitHub
parent 8c6ee7db54
commit 992c7889ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 404 additions and 391 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
@ -77,8 +76,9 @@ public class AllocatedPersistentTask extends CancellableTask {
* <p>
* This doesn't affect the status of this allocated task.
*/
public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
public void updatePersistentTaskState(final PersistentTaskState state,
final ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
persistentTasksService.sendUpdateStateRequest(persistentTaskId, allocationId, state, listener);
}
public String getPersistentTaskId() {
@ -116,7 +116,7 @@ public class AllocatedPersistentTask extends CancellableTask {
}
protected final boolean isCompleted() {
return state.get() == State.COMPLETED;
return state.get() == State.COMPLETED;
}
boolean markAsCancelled() {

View File

@ -20,7 +20,6 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
/**
@ -29,16 +28,17 @@ import org.elasticsearch.threadpool.ThreadPool;
* It abstracts away the execution of tasks and greatly simplifies testing of PersistentTasksNodeService
*/
public class NodePersistentTasksExecutor {
private final ThreadPool threadPool;
public NodePersistentTasksExecutor(ThreadPool threadPool) {
NodePersistentTasksExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}
public <Params extends PersistentTaskParams> void executeTask(Params params,
@Nullable Task.Status status,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
public <Params extends PersistentTaskParams> void executeTask(final Params params,
final @Nullable PersistentTaskState state,
final AllocatedPersistentTask task,
final PersistentTasksExecutor<Params> executor) {
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -49,14 +49,12 @@ public class NodePersistentTasksExecutor {
@Override
protected void doRun() throws Exception {
try {
executor.nodeOperation(task, params, status);
executor.nodeOperation(task, params, state);
} catch (Exception ex) {
task.markAsFailed(ex);
}
}
});
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentObject;
/**
* {@link PersistentTaskState} represents the state of the persistent tasks, as it
* is persisted in the cluster state.
*/
public interface PersistentTaskState extends ToXContentObject, NamedWriteable {
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.decider.AssignmentDecision;
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.tasks.Task;
import java.util.Objects;
@ -178,27 +177,30 @@ public class PersistentTasksClusterService extends AbstractComponent implements
}
/**
* Update task status
* Update the state of a persistent task
*
* @param id the id of a persistent task
* @param allocationId the expected allocation id of the persistent task
* @param status new status
* @param listener the listener that will be called when task is removed
* @param taskId the id of a persistent task
* @param taskAllocationId the expected allocation id of the persistent task
* @param taskState new state
* @param listener the listener that will be called when task is removed
*/
public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
public void updatePersistentTaskState(final String taskId,
final long taskAllocationId,
final PersistentTaskState taskState,
final ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("update task state", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(id, allocationId)) {
return update(currentState, tasksInProgress.updateTaskStatus(id, status));
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
return update(currentState, tasksInProgress.updateTaskState(taskId, taskState));
} else {
if (tasksInProgress.hasTask(id)) {
logger.warn("trying to update status on task {} with unexpected allocation id {}", id, allocationId);
if (tasksInProgress.hasTask(taskId)) {
logger.warn("trying to update state on task {} with unexpected allocation id {}", taskId, taskAllocationId);
} else {
logger.warn("trying to update status on non-existing task {}", id);
logger.warn("trying to update state on non-existing task {}", taskId);
}
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", id, allocationId);
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
}
}
@ -209,7 +211,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, id));
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId));
}
});
}

View File

@ -38,8 +38,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.Task.Status;
import java.io.IOException;
import java.util.Collection;
@ -61,13 +59,12 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
* A cluster state record that contains a list of all running persistent tasks
*/
public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
public static final String TYPE = "persistent_tasks";
public static final String TYPE = "persistent_tasks";
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
// TODO: Implement custom Diff for tasks
private final Map<String, PersistentTask<?>> tasks;
private final long lastAllocationId;
public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, PersistentTask<?>> tasks) {
@ -94,8 +91,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
ObjectParser<TaskDescriptionBuilder<PersistentTaskParams>, String> parser = new ObjectParser<>("named");
parser.declareObject(TaskDescriptionBuilder::setParams,
(p, c) -> p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params"));
parser.declareObject(TaskDescriptionBuilder::setStatus,
(p, c) -> p.namedObject(Status.class, c, null), new ParseField("status"));
parser.declareObject(TaskDescriptionBuilder::setState,
(p, c) -> p.namedObject(PersistentTaskState.class, c, null), new ParseField("state", "status"));
TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name);
// Assignment parser
@ -115,7 +112,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
TaskDescriptionBuilder<PersistentTaskParams> builder = objects.get(0);
taskBuilder.setTaskName(builder.taskName);
taskBuilder.setParams(builder.params);
taskBuilder.setStatus(builder.status);
taskBuilder.setState(builder.state);
}, TASK_DESCRIPTION_PARSER, new ParseField("task"));
PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate,
@ -123,12 +120,13 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
/**
* Private builder used in XContent parser to build task-specific portion (params and status)
* Private builder used in XContent parser to build task-specific portion (params and state)
*/
private static class TaskDescriptionBuilder<Params extends PersistentTaskParams> {
private final String taskName;
private Params params;
private Status status;
private PersistentTaskState state;
private TaskDescriptionBuilder(String taskName) {
this.taskName = taskName;
@ -139,8 +137,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this;
}
private TaskDescriptionBuilder setStatus(Status status) {
this.status = status;
private TaskDescriptionBuilder setState(PersistentTaskState state) {
this.state = state;
return this;
}
}
@ -261,37 +259,34 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* A record that represents a single running persistent task
*/
public static class PersistentTask<P extends PersistentTaskParams> implements Writeable, ToXContentObject {
private final String id;
private final long allocationId;
private final String taskName;
private final P params;
@Nullable
private final Status status;
private final @Nullable PersistentTaskState state;
private final Assignment assignment;
@Nullable
private final Long allocationIdOnLastStatusUpdate;
private final @Nullable Long allocationIdOnLastStatusUpdate;
public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, params, null, assignment, null);
public PersistentTask(final String id, final String name, final P params, final long allocationId, final Assignment assignment) {
this(id, allocationId, name, params, null, assignment, null);
}
public PersistentTask(PersistentTask<P> task, long allocationId, Assignment assignment) {
this(task.id, allocationId, task.taskName, task.params, task.status,
assignment, task.allocationId);
public PersistentTask(final PersistentTask<P> task, final long allocationId, final Assignment assignment) {
this(task.id, allocationId, task.taskName, task.params, task.state, assignment, task.allocationId);
}
public PersistentTask(PersistentTask<P> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.params, status,
task.assignment, task.allocationId);
public PersistentTask(final PersistentTask<P> task, final PersistentTaskState state) {
this(task.id, task.allocationId, task.taskName, task.params, state, task.assignment, task.allocationId);
}
private PersistentTask(String id, long allocationId, String taskName, P params,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
private PersistentTask(final String id, final long allocationId, final String name, final P params,
final PersistentTaskState state, final Assignment assignment, final Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.taskName = taskName;
this.taskName = name;
this.params = params;
this.status = status;
this.state = state;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
if (params != null) {
@ -300,10 +295,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
params.getWriteableName() + " task: " + taskName);
}
}
if (status != null) {
if (status.getWriteableName().equals(taskName) == false) {
if (state != null) {
if (state.getWriteableName().equals(taskName) == false) {
throw new IllegalArgumentException("status has to have the same writeable name as task. status: " +
status.getWriteableName() + " task: " + taskName);
state.getWriteableName() + " task: " + taskName);
}
}
}
@ -318,7 +313,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} else {
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
}
status = in.readOptionalNamedWriteable(Task.Status.class);
state = in.readOptionalNamedWriteable(PersistentTaskState.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
}
@ -333,7 +328,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} else {
out.writeOptionalNamedWriteable(params);
}
out.writeOptionalNamedWriteable(status);
out.writeOptionalNamedWriteable(state);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
out.writeOptionalLong(allocationIdOnLastStatusUpdate);
@ -348,15 +343,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) &&
Objects.equals(params, that.params) &&
Objects.equals(status, that.status) &&
Objects.equals(state, that.state) &&
Objects.equals(assignment, that.assignment) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId, taskName, params, status, assignment,
allocationIdOnLastStatusUpdate);
return Objects.hash(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate);
}
@Override
@ -395,8 +389,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
@Nullable
public Status getStatus() {
return status;
public PersistentTaskState getState() {
return state;
}
@Override
@ -411,8 +405,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
if (params != null) {
builder.field("params", params, xParams);
}
if (status != null) {
builder.field("status", status, xParams);
if (state != null) {
builder.field("state", state, xParams);
}
}
builder.endObject();
@ -448,7 +442,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private long allocationId;
private String taskName;
private Params params;
private Status status;
private PersistentTaskState state;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
@ -472,8 +466,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this;
}
public TaskBuilder<Params> setStatus(Status status) {
this.status = status;
public TaskBuilder<Params> setState(PersistentTaskState state) {
this.state = state;
return this;
}
@ -489,8 +483,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public PersistentTask<Params> build() {
return new PersistentTask<>(id, allocationId, taskName, params, status,
assignment, allocationIdOnLastStatusUpdate);
return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate);
}
}
@ -608,13 +601,13 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
/**
* Updates the task status
* Updates the task state
*/
public Builder updateTaskStatus(String taskId, Status status) {
public Builder updateTaskState(final String taskId, final PersistentTaskState taskState) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, status));
tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState));
} else {
throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists");
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import java.util.Map;
@ -118,7 +117,7 @@ public abstract class PersistentTasksExecutor<Params extends PersistentTaskParam
* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
* indicate that the persistent task has finished.
*/
protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status);
protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable PersistentTaskState state);
public String getExecutor() {
return executor;

View File

@ -50,13 +50,13 @@ import static java.util.Objects.requireNonNull;
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
*/
public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener {
private final Map<Long, AllocatedPersistentTask> runningTasks = new HashMap<>();
private final PersistentTasksService persistentTasksService;
private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry;
private final TaskManager taskManager;
private final NodePersistentTasksExecutor nodePersistentTasksExecutor;
public PersistentTasksNodeService(Settings settings,
PersistentTasksService persistentTasksService,
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry,
@ -172,7 +172,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
task.getPersistentTaskId(), task.getAllocationId());
try {
runningTasks.put(taskInProgress.getAllocationId(), task);
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor);
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getState(), task, executor);
} catch (Exception e) {
// Submit task failure
task.markAsFailed(e);
@ -215,8 +215,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
}
}
public static class Status implements Task.Status {
public static final String NAME = "persistent_executor";
private final AllocatedPersistentTask.State state;
@ -252,10 +252,6 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
return Strings.toString(this);
}
public AllocatedPersistentTask.State getState() {
return state;
}
@Override
public boolean isFragment() {
return false;

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
@ -113,13 +112,14 @@ public class PersistentTasksService extends AbstractComponent {
* Notifies the master node that the state of a persistent task has changed.
* <p>
* Persistent task implementers shouldn't call this method directly and use
* {@link AllocatedPersistentTask#updatePersistentStatus} instead
* {@link AllocatedPersistentTask#updatePersistentTaskState} instead
*/
void updateStatus(final String taskId,
final long taskAllocationID,
final Task.Status status,
final ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, status);
void sendUpdateStateRequest(final String taskId,
final long taskAllocationID,
final PersistentTaskState taskState,
final ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request request =
new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, taskState);
execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener);
}

View File

@ -39,7 +39,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -63,16 +62,15 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
private String taskId;
private long allocationId = -1L;
private Task.Status status;
private PersistentTaskState state;
public Request() {
}
public Request(String taskId, long allocationId, Task.Status status) {
public Request(String taskId, long allocationId, PersistentTaskState state) {
this.taskId = taskId;
this.allocationId = allocationId;
this.status = status;
this.state = state;
}
public void setTaskId(String taskId) {
@ -83,8 +81,8 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
this.allocationId = allocationId;
}
public void setStatus(Task.Status status) {
this.status = status;
public void setState(PersistentTaskState state) {
this.state = state;
}
@Override
@ -92,7 +90,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
super.readFrom(in);
taskId = in.readString();
allocationId = in.readLong();
status = in.readOptionalNamedWriteable(Task.Status.class);
state = in.readOptionalNamedWriteable(PersistentTaskState.class);
}
@Override
@ -100,7 +98,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
super.writeTo(out);
out.writeString(taskId);
out.writeLong(allocationId);
out.writeOptionalNamedWriteable(status);
out.writeOptionalNamedWriteable(state);
}
@Override
@ -122,13 +120,12 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId &&
Objects.equals(status, request.status);
return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId && Objects.equals(state, request.state);
}
@Override
public int hashCode() {
return Objects.hash(taskId, allocationId, status);
return Objects.hash(taskId, allocationId, state);
}
}
@ -144,11 +141,10 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
return this;
}
public final RequestBuilder setStatus(Task.Status status) {
request.setStatus(status);
public final RequestBuilder setState(PersistentTaskState state) {
request.setState(state);
return this;
}
}
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
@ -182,9 +178,10 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
}
@Override
protected final void masterOperation(final Request request, ClusterState state,
protected final void masterOperation(final Request request,
final ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status,
persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state,
new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {

View File

@ -37,7 +37,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
@ -649,7 +648,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status status) {
protected void nodeOperation(AllocatedPersistentTask task, P params, PersistentTaskState state) {
throw new UnsupportedOperationException();
}
}));

View File

@ -42,10 +42,9 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Builder;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractDiffableSerializationTestCase;
import java.io.IOException;
@ -79,7 +78,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
randomAssignment());
if (randomBoolean()) {
// From time to time update status
tasks.updateTaskStatus(taskId, new Status(randomAlphaOfLength(10)));
tasks.updateTaskState(taskId, new State(randomAlphaOfLength(10)));
}
}
return tasks.build();
@ -96,7 +95,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new),
new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom),
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
new Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
new Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new)
));
}
@ -118,7 +117,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.updateTaskStatus(pickRandomTask(builder), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
builder.updateTaskState(pickRandomTask(builder), randomBoolean() ? new State(randomAlphaOfLength(10)) : null);
}
break;
case 3:
@ -155,9 +154,10 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
@Override
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(Arrays.asList(
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestParams::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent)
new NamedXContentRegistry.Entry(PersistentTaskParams.class,
new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class,
new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent)
));
}
@ -186,7 +186,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
// Things that should be serialized
assertEquals(testTask.getTaskName(), newTask.getTaskName());
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getStatus(), newTask.getStatus());
assertEquals(testTask.getState(), newTask.getState());
assertEquals(testTask.getParams(), newTask.getParams());
// Things that shouldn't be serialized
@ -224,10 +224,10 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
case 2:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
builder.updateTaskState(lastKnownTask, randomBoolean() ? new State(randomAlphaOfLength(10)) : null);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskStatus(fLastKnownTask, null));
expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null));
}
break;
case 3:

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -64,7 +63,7 @@ public abstract class PersistentTasksDecidersTestCase extends ESTestCase {
public <Params extends PersistentTaskParams> PersistentTasksExecutor<Params> getPersistentTaskExecutorSafe(String taskName) {
return new PersistentTasksExecutor<Params>(clusterService.getSettings(), taskName, null) {
@Override
protected void nodeOperation(AllocatedPersistentTask task, Params params, Task.Status status) {
protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) {
logger.debug("Executing task {}", task);
}
};

View File

@ -31,7 +31,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
@ -190,11 +190,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasksInProgress.tasks().size(), equalTo(1));
assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue());
assertThat(tasksInProgress.tasks().iterator().next().getState(), nullValue());
int numberOfUpdates = randomIntBetween(1, 10);
for (int i = 0; i < numberOfUpdates; i++) {
logger.info("Updating the task status");
logger.info("Updating the task states");
// Complete the running task and make sure it finishes properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("update_status").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
@ -202,8 +202,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
int finalI = i;
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
persistentTasksService.waitForPersistentTaskCondition(taskId,
task -> task != null && task.getStatus() != null && task.getStatus().toString() != null &&
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
task -> task != null && task.getState() != null && task.getState().toString() != null &&
task.getState().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
TimeValue.timeValueSeconds(10), future1);
assertThat(future1.get().getId(), equalTo(taskId));
}
@ -215,7 +215,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
PlainActionFuture<PersistentTask<?>> failedUpdateFuture = new PlainActionFuture<>();
persistentTasksService.updateStatus(taskId, -2, new Status("should fail"), failedUpdateFuture);
persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), failedUpdateFuture);
assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId +
" and allocation id -2 doesn't exist");

View File

@ -210,13 +210,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
ClusterState state = createInitialClusterState(1, Settings.EMPTY);
Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase");
PersistentTaskState taskState = new TestPersistentTasksPlugin.State("_test_phase");
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
String taskId = UUIDs.base64UUID();
TestParams taskParams = new TestParams("other_0");
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams,
new Assignment("this_node", "test assignment on other node"));
tasks.updateTaskStatus(taskId, status);
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, new Assignment("this_node", "test assignment on other node"));
tasks.updateTaskState(taskId, taskState);
MetaData.Builder metaData = MetaData.builder(state.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
@ -225,7 +224,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
assertThat(executor.size(), equalTo(1));
assertThat(executor.get(0).params, sameInstance(taskParams));
assertThat(executor.get(0).status, sameInstance(status));
assertThat(executor.get(0).state, sameInstance(taskState));
assertThat(executor.get(0).task, sameInstance(nodeTask));
}
@ -331,15 +330,16 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
private class Execution {
private final PersistentTaskParams params;
private final AllocatedPersistentTask task;
private final Task.Status status;
private final PersistentTaskState state;
private final PersistentTasksExecutor<?> holder;
Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor<?> holder) {
Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTaskState state, PersistentTasksExecutor<?> holder) {
this.params = params;
this.task = task;
this.status = status;
this.state = state;
this.holder = holder;
}
}
@ -352,11 +352,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
@Override
public <Params extends PersistentTaskParams> void executeTask(Params params,
Task.Status status,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
executions.add(new Execution(params, task, status, executor));
public <Params extends PersistentTaskParams> void executeTask(final Params params,
final PersistentTaskState state,
final AllocatedPersistentTask task,
final PersistentTasksExecutor<Params> executor) {
executions.add(new Execution(params, task, state, executor));
}
public Execution get(int i) {

View File

@ -55,7 +55,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
@ -100,16 +99,17 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new)
);
}
@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestParams::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent)
new NamedXContentRegistry.Entry(PersistentTaskParams.class,
new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class,
new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent)
);
}
@ -221,22 +221,22 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
}
}
public static class Status implements Task.Status {
public static class State implements PersistentTaskState {
private final String phase;
public static final ConstructingObjectParser<Status, Void> STATUS_PARSER =
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new Status((String) args[0]));
public static final ConstructingObjectParser<State, Void> STATE_PARSER =
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new State((String) args[0]));
static {
STATUS_PARSER.declareString(constructorArg(), new ParseField("phase"));
STATE_PARSER.declareString(constructorArg(), new ParseField("phase"));
}
public Status(String phase) {
public State(String phase) {
this.phase = requireNonNull(phase, "Phase cannot be null");
}
public Status(StreamInput in) throws IOException {
public State(StreamInput in) throws IOException {
phase = in.readString();
}
@ -253,11 +253,10 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
return builder;
}
public static Task.Status fromXContent(XContentParser parser) throws IOException {
return STATUS_PARSER.parse(parser, null);
public static PersistentTaskState fromXContent(XContentParser parser) throws IOException {
return STATE_PARSER.parse(parser, null);
}
@Override
public boolean isFragment() {
return false;
@ -276,10 +275,10 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
// Implements equals and hashcode for testing
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != Status.class) {
if (obj == null || obj.getClass() != State.class) {
return false;
}
Status other = (Status) obj;
State other = (State) obj;
return phase.equals(other.phase);
}
@ -289,7 +288,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
}
}
public static class TestPersistentTasksExecutor extends PersistentTasksExecutor<TestParams> {
public static final String NAME = "cluster:admin/persistent/test";
@ -317,7 +315,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) {
protected void nodeOperation(AllocatedPersistentTask task, TestParams params, PersistentTaskState state) {
logger.info("started node operation for the task {}", task);
try {
TestTask testTask = (TestTask) task;
@ -340,9 +338,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
} else if ("update_status".equals(testTask.getOperation())) {
testTask.setOperation(null);
CountDownLatch latch = new CountDownLatch(1);
Status newStatus = new Status("phase " + phase.incrementAndGet());
logger.info("updating the task status to {}", newStatus);
task.updatePersistentStatus(newStatus, new ActionListener<PersistentTask<?>>() {
State newState = new State("phase " + phase.incrementAndGet());
logger.info("updating the task state to {}", newState);
task.updatePersistentTaskState(newState, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
logger.info("updating was successful");
@ -540,5 +538,4 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
}
}

View File

@ -20,9 +20,8 @@ package org.elasticsearch.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.State;
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction.Request;
@ -32,7 +31,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
@Override
protected Request createTestInstance() {
return new Request(UUIDs.base64UUID(), randomLong(), new Status(randomAlphaOfLength(10)));
return new Request(UUIDs.base64UUID(), randomLong(), new State(randomAlphaOfLength(10)));
}
@Override
@ -43,7 +42,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new)
new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new)
));
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.license.PostStartBasicAction;
import org.elasticsearch.license.PostStartTrialAction;
import org.elasticsearch.license.PutLicenseAction;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -89,7 +90,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage;
import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage;
import org.elasticsearch.xpack.core.rollup.RollupField;
@ -325,9 +326,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
StartDatafeedAction.DatafeedParams::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
OpenJobAction.JobParams::new),
// ML - Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new),
new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream),
// ML - Task states
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream),
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.MACHINE_LEARNING,
MachineLearningFeatureSetUsage::new),
// monitoring
@ -350,7 +351,8 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
// rollup
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ROLLUP, RollupFeatureSetUsage::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new),
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new)
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new)
);
}
@ -365,9 +367,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
StartDatafeedAction.DatafeedParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
OpenJobAction.JobParams::fromXContent),
// ML - Task statuses
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobTaskStatus.NAME), JobTaskStatus::fromXContent),
// ML - Task states
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(JobTaskState.NAME), JobTaskState::fromXContent),
// watcher
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(WatcherMetaData.TYPE),
WatcherMetaData::fromXContent),
@ -375,8 +377,12 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(LicensesMetaData.TYPE),
LicensesMetaData::fromXContent),
//rollup
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME), RollupJob::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), RollupJobStatus::fromXContent)
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME),
RollupJob::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME),
RollupJobStatus::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(RollupJobStatus.NAME),
RollupJobStatus::fromXContent)
);
}

View File

@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@ -402,9 +402,9 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
if (allowDeleteOpenJob == false) {
PersistentTask<?> jobTask = getJobTask(jobId, tasks);
if (jobTask != null) {
JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus();
JobTaskState jobTaskState = (JobTaskState) jobTask.getState();
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
+ ((jobTaskStatus == null) ? JobState.OPENING : jobTaskStatus.getState()));
+ ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState()));
}
}
Job.Builder jobBuilder = new Job.Builder(job);
@ -448,7 +448,7 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTask<?> task = getJobTask(jobId, tasks);
if (task != null) {
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
JobTaskState jobTaskState = (JobTaskState) task.getState();
if (jobTaskState == null) {
return JobState.OPENING;
}
@ -460,8 +460,8 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
if (task != null && task.getStatus() != null) {
return (DatafeedState) task.getStatus();
if (task != null && task.getState() != null) {
return (DatafeedState) task.getState();
} else {
// If we haven't started a datafeed then there will be no persistent task,
// which is the same as if the datafeed was't started

View File

@ -12,7 +12,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import java.io.IOException;
@ -20,7 +20,7 @@ import java.util.Locale;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public enum DatafeedState implements Task.Status {
public enum DatafeedState implements PersistentTaskState {
STARTED, STOPPED, STARTING, STOPPING;

View File

@ -12,25 +12,25 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class JobTaskStatus implements Task.Status {
public class JobTaskState implements PersistentTaskState {
public static final String NAME = OpenJobAction.TASK_NAME;
private static ParseField STATE = new ParseField("state");
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
private static final ConstructingObjectParser<JobTaskStatus, Void> PARSER =
private static final ConstructingObjectParser<JobTaskState, Void> PARSER =
new ConstructingObjectParser<>(NAME,
args -> new JobTaskStatus((JobState) args[0], (Long) args[1]));
args -> new JobTaskState((JobState) args[0], (Long) args[1]));
static {
PARSER.declareField(constructorArg(), p -> {
@ -42,7 +42,7 @@ public class JobTaskStatus implements Task.Status {
PARSER.declareLong(constructorArg(), ALLOCATION_ID);
}
public static JobTaskStatus fromXContent(XContentParser parser) {
public static JobTaskState fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
} catch (IOException e) {
@ -53,12 +53,12 @@ public class JobTaskStatus implements Task.Status {
private final JobState state;
private final long allocationId;
public JobTaskStatus(JobState state, long allocationId) {
public JobTaskState(JobState state, long allocationId) {
this.state = Objects.requireNonNull(state);
this.allocationId = allocationId;
}
public JobTaskStatus(StreamInput in) throws IOException {
public JobTaskState(StreamInput in) throws IOException {
state = JobState.fromStream(in);
allocationId = in.readLong();
}
@ -100,7 +100,7 @@ public class JobTaskStatus implements Task.Status {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobTaskStatus that = (JobTaskStatus) o;
JobTaskState that = (JobTaskState) o;
return state == that.state &&
Objects.equals(allocationId, that.allocationId);
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.tasks.Task;
import java.io.IOException;
@ -30,7 +31,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
* indexer's current position. When the allocated task updates its status,
* it is providing a new version of this.
*/
public class RollupJobStatus implements Task.Status {
public class RollupJobStatus implements Task.Status, PersistentTaskState {
public static final String NAME = "xpack/rollup/job";
private final IndexerState state;
@ -73,7 +74,7 @@ public class RollupJobStatus implements Task.Status {
currentPosition = in.readBoolean() ? new TreeMap<>(in.readMap()) : null;
}
public IndexerState getState() {
public IndexerState getIndexerState() {
return state;
}

View File

@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -256,8 +256,8 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
@Override
protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAction.JobTask jobTask,
ActionListener<CloseJobAction.Response> listener) {
JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId());
jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> {
JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId());
jobTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> {
// we need to fork because we are now on a network threadpool and closeJob method may take a while to complete:
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override

View File

@ -39,12 +39,12 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -57,7 +57,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
@ -208,7 +208,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
persistentTasks.findTasks(OpenJobAction.TASK_NAME,
task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus();
JobTaskState jobTaskState = (JobTaskState) assignedTask.getState();
JobState jobState;
if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING
// previous executor node failed and current executor node didn't have the chance to set job status to OPENING
@ -675,14 +675,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, Task.Status status) {
protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, PersistentTaskState state) {
JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager;
JobTaskStatus jobStateStatus = (JobTaskStatus) status;
JobTaskState jobTaskState = (JobTaskState) state;
// If the job is failed then the Persistent Task Service will
// try to restart it on a node restart. Exiting here leaves the
// job in the failed state and it must be force closed.
if (jobStateStatus != null && jobStateStatus.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
return;
}
@ -766,8 +766,8 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
JobState jobState = JobState.CLOSED;
if (persistentTask != null) {
JobTaskStatus jobStateStatus = (JobTaskStatus) persistentTask.getStatus();
jobState = jobStateStatus == null ? JobState.OPENING : jobStateStatus.getState();
JobTaskState jobTaskState = (JobTaskState) persistentTask.getState();
jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState();
PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
// This logic is only appropriate when opening a job, not when reallocating following a failure,

View File

@ -23,8 +23,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -274,8 +274,9 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
}
@Override
protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, StartDatafeedAction.DatafeedParams params,
Task.Status status) {
protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTask,
final StartDatafeedAction.DatafeedParams params,
final PersistentTaskState state) {
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
datafeedTask.datafeedManager = datafeedManager;
datafeedManager.run(datafeedTask,
@ -373,7 +374,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
return true;
}
DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus();
DatafeedState datafeedState = (DatafeedState) persistentTask.getState();
return datafeedState == DatafeedState.STARTED;
}
}

View File

@ -222,10 +222,10 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
}
@Override
protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTaskTask,
protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTask,
ActionListener<StopDatafeedAction.Response> listener) {
DatafeedState taskStatus = DatafeedState.STOPPING;
datafeedTaskTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> {
DatafeedState taskState = DatafeedState.STOPPING;
datafeedTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> {
// we need to fork because we are now on a network threadpool
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override
@ -235,7 +235,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
@Override
protected void doRun() throws Exception {
datafeedTaskTask.stop("stop_datafeed (api)", request.getStopTimeout());
datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout());
listener.onResponse(new StopDatafeedAction.Response(true));
}
});

View File

@ -88,7 +88,7 @@ public class DatafeedManager extends AbstractComponent {
datafeedJob -> {
Holder holder = new Holder(task, datafeed, datafeedJob, new ProblemTracker(auditor, job.getId()), taskHandler);
runningDatafeedsOnThisNode.put(task.getAllocationId(), holder);
task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
taskRunner.runWhenJobIsOpened(task);

View File

@ -12,12 +12,12 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import java.util.List;
import java.util.Objects;
@ -64,11 +64,11 @@ public class DatafeedNodeSelector {
PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector();
priorityFailureCollector.add(verifyIndicesActive(datafeed));
JobTaskStatus taskStatus = null;
JobTaskState jobTaskState = null;
JobState jobState = JobState.CLOSED;
if (jobTask != null) {
taskStatus = (JobTaskStatus) jobTask.getStatus();
jobState = taskStatus == null ? JobState.OPENING : taskStatus.getState();
jobTaskState = (JobTaskState) jobTask.getState();
jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState();
}
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
@ -78,8 +78,8 @@ public class DatafeedNodeSelector {
priorityFailureCollector.add(new AssignmentFailure(reason, true));
}
if (taskStatus != null && taskStatus.isStatusStale(jobTask)) {
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale";
if (jobTaskState != null && jobTaskState.isStatusStale(jobTask)) {
String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] state is stale";
priorityFailureCollector.add(new AssignmentFailure(reason, true));
}

View File

@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
@ -623,8 +623,8 @@ public class AutodetectProcessManager extends AbstractComponent {
}
void setJobState(JobTask jobTask, JobState state) {
JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId());
@ -638,8 +638,8 @@ public class AutodetectProcessManager extends AbstractComponent {
}
void setJobState(JobTask jobTask, JobState state, CheckedConsumer<Exception, IOException> handler) {
JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId());
jobTask.updatePersistentStatus(taskStatus, new ActionListener<PersistentTask<?>>() {
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
jobTask.updatePersistentTaskState(jobTaskState, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> persistentTask) {
try {

View File

@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -363,7 +363,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
assertEquals(JobState.OPENING, MlMetadata.getJobState("foo", tasksBuilder.build()));
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("foo"), new JobTaskStatus(JobState.OPENED, tasksBuilder.getLastAllocationId()));
tasksBuilder.updateTaskState(MlMetadata.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId()));
assertEquals(JobState.OPENED, MlMetadata.getJobState("foo", tasksBuilder.build()));
}

View File

@ -314,7 +314,7 @@ public class TransportCloseJobActionTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasks) {
tasks.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new Assignment(nodeId, "test assignment"));
tasks.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state);
tasks.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state);
}
}

View File

@ -42,7 +42,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -329,7 +329,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("job_id6"), null);
tasksBuilder.updateTaskState(MlMetadata.jobTaskId("job_id6"), null);
tasks = tasksBuilder.build();
csBuilder = ClusterState.builder(cs);
@ -630,7 +630,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId),
new Assignment(nodeId, "test assignment"));
if (jobState != null) {
builder.updateTaskStatus(MlMetadata.jobTaskId(jobId), new JobTaskStatus(jobState, builder.getLastAllocationId()));
builder.updateTaskState(MlMetadata.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId()));
}
}

View File

@ -31,7 +31,7 @@ public class TransportStopDatafeedActionTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder.addTask(MLMetadataField.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", ""));
tasksBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED);
tasksBuilder.updateTaskState(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED);
tasksBuilder.build();
Job job = createDatafeedJob().build(new Date());
@ -121,6 +121,6 @@ public class TransportStopDatafeedActionTests extends ESTestCase {
taskBuilder.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME,
new StartDatafeedAction.DatafeedParams(datafeedId, startTime),
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
taskBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state);
taskBuilder.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state);
}
}

View File

@ -378,7 +378,7 @@ public class DatafeedManagerTests extends ESTestCase {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
listener.onResponse(mock(PersistentTask.class));
return null;
}).when(task).updatePersistentStatus(any(), any());
}).when(task).updatePersistentTaskState(any(), any());
return task;
}
@ -394,7 +394,7 @@ public class DatafeedManagerTests extends ESTestCase {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
listener.onResponse(mock(PersistentTask.class));
return null;
}).when(task).updatePersistentStatus(any(), any());
}).when(task).updatePersistentTaskState(any(), any());
return task;
}
}

View File

@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.junit.Before;
@ -255,20 +255,20 @@ public class DatafeedNodeSelectorTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder);
// Set to lower allocationId, so job task is stale:
tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0));
tasksBuilder.updateTaskState(MlMetadata.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0));
tasks = tasksBuilder.build();
givenClusterState("foo", 1, 0);
PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode();
assertNull(result.getExecutorNode());
assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale",
assertEquals("cannot start datafeed [datafeed_id], job [job_id] state is stale",
result.getExplanation());
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated());
assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation "
+ "[cannot start datafeed [datafeed_id], job [job_id] status is stale]"));
+ "[cannot start datafeed [datafeed_id], job [job_id] state is stale]"));
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder);

View File

@ -39,7 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
@ -211,9 +211,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
assertNotNull(jobTaskStatus);
assertEquals(JobState.OPENED, jobTaskStatus.getState());
JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
assertEquals(JobState.OPENED, jobTaskState.getState());
});
logger.info("stop the only running ml node");
@ -264,7 +264,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
for (DiscoveryNode node : event.state().nodes()) {
Collection<PersistentTask<?>> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> {
JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus();
JobTaskState jobTaskState = (JobTaskState) task.getState();
return node.getId().equals(task.getExecutorNode()) &&
(jobTaskState == null || jobTaskState.isStatusStale(task));
});
@ -396,9 +396,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true"));
assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
assertNotNull(jobTaskStatus);
assertEquals(expectedState, jobTaskStatus.getState());
JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
assertEquals(expectedState, jobTaskState.getState());
} else {
assertNull(task.getExecutorNode());
}
@ -411,9 +411,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
assertEquals(numJobs, tasks.taskMap().size());
for (PersistentTask<?> task : tasks.taskMap().values()) {
assertNotNull(task.getExecutorNode());
JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus();
assertNotNull(jobTaskStatus);
assertEquals(JobState.OPENED, jobTaskStatus.getState());
JobTaskState jobTaskState = (JobTaskState) task.getState();
assertNotNull(jobTaskState);
assertEquals(JobState.OPENED, jobTaskState.getState());
}
};
}

View File

@ -20,7 +20,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -58,7 +58,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
assertEquals(1, tasks.taskMap().size());
// now just double check that the first job is still opened:
PersistentTasksCustomMetaData.PersistentTask task = tasks.getTask(MlMetadata.jobTaskId("close-failed-job-1"));
assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState());
assertEquals(JobState.OPENED, ((JobTaskState) task.getState()).getState());
}
public void testSingleNode() throws Exception {

View File

@ -9,22 +9,22 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
public class JobTaskStatusTests extends AbstractSerializingTestCase<JobTaskStatus> {
public class JobTaskStateTests extends AbstractSerializingTestCase<JobTaskState> {
@Override
protected JobTaskStatus createTestInstance() {
return new JobTaskStatus(randomFrom(JobState.values()), randomLong());
protected JobTaskState createTestInstance() {
return new JobTaskState(randomFrom(JobState.values()), randomLong());
}
@Override
protected Writeable.Reader<JobTaskStatus> instanceReader() {
return JobTaskStatus::new;
protected Writeable.Reader<JobTaskState> instanceReader() {
return JobTaskState::new;
}
@Override
protected JobTaskStatus doParseInstance(XContentParser parser) {
return JobTaskStatus.fromXContent(parser);
protected JobTaskState doParseInstance(XContentParser parser) {
return JobTaskState.fromXContent(parser);
}
}

View File

@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
@ -199,7 +199,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
manager.openJob(jobTask, e -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any());
verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any());
}
public void testOpenJob_exceedMaxNumJobs() {

View File

@ -19,6 +19,7 @@ import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
@ -62,7 +63,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, Status status) {
protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, PersistentTaskState state) {
RollupJobTask rollupJobTask = (RollupJobTask) task;
SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(SCHEDULE_NAME + "_" + params.getConfig().getId(),
new CronSchedule(params.getConfig().getCron()));
@ -80,7 +81,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
PersistentTasksCustomMetaData.PersistentTask<RollupJob> persistentTask,
Map<String, String> headers) {
return new RollupJobTask(id, type, action, parentTaskId, persistentTask.getParams(),
(RollupJobStatus) persistentTask.getStatus(), client, schedulerEngine, threadPool, headers);
(RollupJobStatus) persistentTask.getState(), client, schedulerEngine, threadPool, headers);
}
}
@ -115,15 +116,15 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
}
@Override
protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
if (state.equals(IndexerState.ABORTING)) {
protected void doSaveState(IndexerState indexerState, Map<String, Object> position, Runnable next) {
if (indexerState.equals(IndexerState.ABORTING)) {
// If we're aborting, just invoke `next` (which is likely an onFailure handler)
next.run();
} else {
// Otherwise, attempt to persist our state
final RollupJobStatus status = new RollupJobStatus(state, getPosition());
logger.debug("Updating persistent status of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]");
updatePersistentStatus(status, ActionListener.wrap(task -> next.run(), exc -> next.run()));
final RollupJobStatus state = new RollupJobStatus(indexerState, getPosition());
logger.debug("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + indexerState.toString() + "]");
updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> next.run()));
}
}
@ -148,7 +149,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
private final ThreadPool threadPool;
private final RollupIndexer indexer;
RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus status,
RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
super(id, type, action, RollupField.NAME + "_" + job.getConfig().getId(), parentTask, headers);
this.job = job;
@ -158,16 +159,17 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
// If status is not null, we are resuming rather than starting fresh.
Map<String, Object> initialPosition = null;
IndexerState initialState = IndexerState.STOPPED;
if (status != null) {
logger.debug("We have existing status, setting state to [" + status.getState() + "] " +
"and current position to [" + status.getPosition() + "] for job [" + job.getConfig().getId() + "]");
if (status.getState().equals(IndexerState.INDEXING)) {
if (state != null) {
final IndexerState existingState = state.getIndexerState();
logger.debug("We have existing state, setting state to [" + existingState + "] " +
"and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]");
if (existingState.equals(IndexerState.INDEXING)) {
/*
* If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
* it is indexing but without the actual indexing thread running.
*/
initialState = IndexerState.STARTED;
} else if (status.getState().equals(IndexerState.ABORTING) || status.getState().equals(IndexerState.STOPPING)) {
} else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) {
// It shouldn't be possible to persist ABORTING, but if for some reason it does,
// play it safe and restore the job as STOPPED. An admin will have to clean it up,
// but it won't be running, and won't delete itself either. Safest option.
@ -175,9 +177,9 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
// to restore as STOPEPD
initialState = IndexerState.STOPPED;
} else {
initialState = status.getState();
initialState = existingState;
}
initialPosition = status.getPosition();
initialPosition = state.getPosition();
}
this.indexer = new ClientRollupPageManager(job, initialState, initialPosition,
new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())));
@ -227,20 +229,20 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
+ " state was [" + newState + "]"));
return;
}
final RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition());
logger.debug("Updating status for rollup job [" + job.getConfig().getId() + "] to [" + status.getState() + "][" +
status.getPosition() + "]");
updatePersistentStatus(status,
final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition());
logger.debug("Updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "][" +
state.getPosition() + "]");
updatePersistentTaskState(state,
ActionListener.wrap(
(task) -> {
logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId() + "] to ["
+ status.getState() + "][" + status.getPosition() + "]");
logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId() + "] to ["
+ state.getIndexerState() + "][" + state.getPosition() + "]");
listener.onResponse(new StartRollupJobAction.Response(true));
},
(exc) -> {
listener.onFailure(
new ElasticsearchException("Error while updating status for rollup job [" + job.getConfig().getId()
+ "] to [" + status.getState() + "].", exc)
new ElasticsearchException("Error while updating state for rollup job [" + job.getConfig().getId()
+ "] to [" + state.getIndexerState() + "].", exc)
);
}
)
@ -268,17 +270,17 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
case STOPPING:
// update the persistent state only if there is no background job running,
// otherwise the state is updated by the indexer when the background job detects the STOPPING state.
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition());
updatePersistentStatus(status,
RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition());
updatePersistentTaskState(state,
ActionListener.wrap(
(task) -> {
logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId()
+ "] to [" + status.getState() + "]");
logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId()
+ "] to [" + state.getIndexerState() + "]");
listener.onResponse(new StopRollupJobAction.Response(true));
},
(exc) -> {
listener.onFailure(new ElasticsearchException("Error while updating status for rollup job ["
+ job.getConfig().getId() + "] to [" + status.getState() + "].", exc));
listener.onFailure(new ElasticsearchException("Error while updating state for rollup job ["
+ job.getConfig().getId() + "] to [" + state.getIndexerState() + "].", exc));
})
);
break;

View File

@ -7,10 +7,8 @@ package org.elasticsearch.xpack.rollup;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
@ -27,7 +25,6 @@ import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Netty4Plugin;
@ -104,7 +101,7 @@ public class RollupIT extends ESIntegTestCase {
}
@Before
public void createIndex() throws Exception {
public void createIndex() {
client().admin().indices().prepareCreate("test-1").addMapping("doc", "{\"doc\": {\"properties\": {" +
"\"date_histo\": {\"type\": \"date\"}, " +
"\"histo\": {\"type\": \"integer\"}, " +
@ -125,7 +122,7 @@ public class RollupIT extends ESIntegTestCase {
}
}
}
BulkResponse response = bulk.get();
bulk.get();
client().admin().indices().prepareRefresh("test-1").get();
}
@ -195,27 +192,23 @@ public class RollupIT extends ESIntegTestCase {
// Make sure it started
ESTestCase.assertBusy(() -> {
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern");
if (rollupJobStatus == null) {;
RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern");
if (rollupJobStatus == null) {
fail("null");
}
IndexerState state = rollupJobStatus.getState();
IndexerState state = rollupJobStatus.getIndexerState();
assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING));
}, 60, TimeUnit.SECONDS);
// And wait for it to finish
ESTestCase.assertBusy(() -> {
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern");
RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern");
if (rollupJobStatus == null) {
fail("null");
}
IndexerState state = rollupJobStatus.getState();
IndexerState state = rollupJobStatus.getIndexerState();
assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null);
}, 60, TimeUnit.SECONDS);
@ -274,23 +267,20 @@ public class RollupIT extends ESIntegTestCase {
// Make sure it started
ESTestCase.assertBusy(() -> {
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1");
RollupJobStatus rollupJobStatus = getRollupJobStatus("job1");
if (rollupJobStatus == null) {
fail("null");
}
IndexerState state = rollupJobStatus.getState();
IndexerState state = rollupJobStatus.getIndexerState();
assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING));
}, 60, TimeUnit.SECONDS);
//but not the other task
ESTestCase.assertBusy(() -> {
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2");
RollupJobStatus rollupJobStatus = getRollupJobStatus("job2");
IndexerState state = rollupJobStatus.getState();
IndexerState state = rollupJobStatus.getIndexerState();
assertTrue(state.equals(IndexerState.STOPPED));
}, 60, TimeUnit.SECONDS);
@ -301,9 +291,7 @@ public class RollupIT extends ESIntegTestCase {
// Make sure the first job's task is gone
ESTestCase.assertBusy(() -> {
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1");
RollupJobStatus rollupJobStatus = getRollupJobStatus("job1");
assertTrue(rollupJobStatus == null);
}, 60, TimeUnit.SECONDS);
@ -320,10 +308,9 @@ public class RollupIT extends ESIntegTestCase {
// and still STOPPED
ESTestCase.assertBusy(() -> {
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2");
RollupJobStatus rollupJobStatus = getRollupJobStatus("job2");
IndexerState state = rollupJobStatus.getState();
IndexerState state = rollupJobStatus.getIndexerState();
assertTrue(state.equals(IndexerState.STOPPED));
}, 60, TimeUnit.SECONDS);
}
@ -404,19 +391,17 @@ public class RollupIT extends ESIntegTestCase {
Assert.assertThat(response.isStarted(), equalTo(true));
ESTestCase.assertBusy(() -> {
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId);
RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId);
if (rollupJobStatus == null) {
fail("null");
}
IndexerState state = rollupJobStatus.getState();
IndexerState state = rollupJobStatus.getIndexerState();
logger.error("state: [" + state + "]");
assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null);
}, 60, TimeUnit.SECONDS);
ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get();
RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId);
RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId);
if (rollupJobStatus == null) {
Assert.fail("rollup job status should not be null");
}
@ -481,11 +466,13 @@ public class RollupIT extends ESIntegTestCase {
}
}
private RollupJobStatus getRollupJobStatus(ListTasksResponse tasksResponse, String taskId) {
for (TaskInfo task : tasksResponse.getTasks()) {
if (task.getDescription().equals("rollup_" + taskId)) {
return ((RollupJobStatus) task.getStatus());
}
private RollupJobStatus getRollupJobStatus(final String taskId) {
final GetRollupJobsAction.Request request = new GetRollupJobsAction.Request(taskId);
final GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, request).actionGet();
if (response.getJobs() != null && response.getJobs().isEmpty() == false) {
assertThat("Expect 1 rollup job with id " + taskId, response.getJobs().size(), equalTo(1));
return response.getJobs().iterator().next().getStatus();
}
return null;
}
@ -498,13 +485,13 @@ public class RollupIT extends ESIntegTestCase {
for (GetRollupJobsAction.JobWrapper job : response.getJobs()) {
StopRollupJobAction.Request stopRequest = new StopRollupJobAction.Request(job.getJob().getId());
try {
StopRollupJobAction.Response stopResponse = client().execute(StopRollupJobAction.INSTANCE, stopRequest).get();
client().execute(StopRollupJobAction.INSTANCE, stopRequest).get();
} catch (ElasticsearchException e) {
//
}
DeleteRollupJobAction.Request deleteRequest = new DeleteRollupJobAction.Request(job.getJob().getId());
DeleteRollupJobAction.Response deleteResponse = client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get();
client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get();
}
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
@ -64,7 +65,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
}
@ -77,7 +78,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
}
@ -90,7 +91,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
}
@ -103,7 +104,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
}
@ -116,7 +117,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
}
@ -128,7 +129,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
}
@ -140,7 +141,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -172,13 +173,14 @@ public class RollupJobTaskTests extends ESTestCase {
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(status, instanceOf(RollupJobStatus.class));
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuse > 2 times");
}
@ -187,7 +189,7 @@ public class RollupJobTaskTests extends ESTestCase {
counter.incrementAndGet();
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
CountDownLatch latch = new CountDownLatch(1);
@ -195,7 +197,7 @@ public class RollupJobTaskTests extends ESTestCase {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
@ -207,7 +209,7 @@ public class RollupJobTaskTests extends ESTestCase {
latch.await(3, TimeUnit.SECONDS);
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@ -248,14 +250,15 @@ public class RollupJobTaskTests extends ESTestCase {
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(status, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -264,7 +267,7 @@ public class RollupJobTaskTests extends ESTestCase {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
@ -285,14 +288,15 @@ public class RollupJobTaskTests extends ESTestCase {
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(status, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@ -301,7 +305,7 @@ public class RollupJobTaskTests extends ESTestCase {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
@ -313,7 +317,7 @@ public class RollupJobTaskTests extends ESTestCase {
latch.await(3, TimeUnit.SECONDS);
task.triggered(new SchedulerEngine.Event("unrelated", 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); // Should still be started, not INDEXING
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
}
public void testTrigger() throws InterruptedException {
@ -325,14 +329,15 @@ public class RollupJobTaskTests extends ESTestCase {
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(status, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
CountDownLatch latch = new CountDownLatch(1);
@ -340,7 +345,7 @@ public class RollupJobTaskTests extends ESTestCase {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
@ -352,7 +357,7 @@ public class RollupJobTaskTests extends ESTestCase {
latch.await(3, TimeUnit.SECONDS);
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
}
@ -392,11 +397,12 @@ public class RollupJobTaskTests extends ESTestCase {
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
Integer counterValue = counter.getAndIncrement();
if (counterValue == 0) {
assertThat(status, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
assertThat(taskState, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
} else if (counterValue == 1) {
@ -405,14 +411,14 @@ public class RollupJobTaskTests extends ESTestCase {
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
started.set(true);
}
@ -424,7 +430,7 @@ public class RollupJobTaskTests extends ESTestCase {
ESTestCase.awaitBusy(started::get);
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
// Allow search response to return now
latch.countDown();
@ -475,11 +481,12 @@ public class RollupJobTaskTests extends ESTestCase {
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
Integer counterValue = counter.getAndIncrement();
if (counterValue == 0) {
assertThat(status, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
assertThat(taskState, instanceOf(RollupJobStatus.class));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
} else if (counterValue == 1) {
@ -488,14 +495,14 @@ public class RollupJobTaskTests extends ESTestCase {
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
started.set(true);
}
@ -507,7 +514,7 @@ public class RollupJobTaskTests extends ESTestCase {
ESTestCase.awaitBusy(started::get);
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
// Allow search response to return now
latch.countDown();
@ -524,7 +531,7 @@ public class RollupJobTaskTests extends ESTestCase {
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
status, client, schedulerEngine, pool, Collections.emptyMap());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
CountDownLatch latch = new CountDownLatch(1);
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@ -553,15 +560,16 @@ public class RollupJobTaskTests extends ESTestCase {
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(status, instanceOf(RollupJobStatus.class));
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuse > 3 times");
}
@ -571,7 +579,7 @@ public class RollupJobTaskTests extends ESTestCase {
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
CountDownLatch latch = new CountDownLatch(1);
@ -579,7 +587,7 @@ public class RollupJobTaskTests extends ESTestCase {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
@ -591,7 +599,7 @@ public class RollupJobTaskTests extends ESTestCase {
latch.await(3, TimeUnit.SECONDS);
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@ -642,7 +650,7 @@ public class RollupJobTaskTests extends ESTestCase {
latch.countDown();
}
};
assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
task.onCancelled();
task.stop(new ActionListener<StopRollupJobAction.Response>() {

View File

@ -26,13 +26,13 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.transport.Netty4Plugin;
@ -70,7 +70,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -449,8 +449,8 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
StartDatafeedAction.DatafeedParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();