Persistent Tasks: PersistentTaskRequest -> PersistTaskParams (elastic/x-pack-elasticsearch#1057)

Removes the last pieces of ActionRequest from PersistentTaskRequest and renames it into PersistTaskParams, which is now just an interface that extends NamedWriteable and ToXContent.

Original commit: elastic/x-pack-elasticsearch@5a298b924f
This commit is contained in:
Igor Motov 2017-04-12 09:58:15 -04:00 committed by GitHub
parent a0dcae97db
commit 253340a597
26 changed files with 347 additions and 300 deletions

View File

@ -123,8 +123,8 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction;
import org.elasticsearch.xpack.persistent.PersistentTasksClusterService;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry;
@ -216,8 +216,8 @@ public class MachineLearning implements ActionPlugin {
PersistentTasksCustomMetaData::readDiffFrom),
// Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new),
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.NAME, OpenJobAction.Request::new),
// Task statuses
new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
@ -236,9 +236,9 @@ public class MachineLearning implements ActionPlugin {
PersistentTasksCustomMetaData::fromXContent),
// Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(StartDatafeedAction.NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.NAME),
StartDatafeedAction.Request::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(OpenJobAction.NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.NAME),
OpenJobAction.Request::fromXContent),
// Task statuses
@ -422,7 +422,7 @@ public class MachineLearning implements ActionPlugin {
new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class),
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class),
new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class),

View File

@ -79,7 +79,7 @@ public class MlAssignmentNotifier extends AbstractComponent implements ClusterSt
continue;
}
if (OpenJobAction.NAME.equals(currentTask.getTaskName())) {
String jobId = ((OpenJobAction.Request) currentTask.getRequest()).getJobId();
String jobId = ((OpenJobAction.Request) currentTask.getParams()).getJobId();
if (currentAssignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
} else {
@ -87,7 +87,7 @@ public class MlAssignmentNotifier extends AbstractComponent implements ClusterSt
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (StartDatafeedAction.NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.Request) currentTask.getRequest()).getDatafeedId();
String datafeedId = ((StartDatafeedAction.Request) currentTask.getParams()).getDatafeedId();
MlMetadata mlMetadata = event.state().getMetaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId);
if (currentAssignment.getExecutorNode() == null) {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
@ -49,7 +50,7 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -86,7 +87,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
return new Response();
}
public static class Request extends PersistentTaskRequest {
public static class Request extends ActionRequest implements PersistentTaskParams {
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
public static final ParseField TIMEOUT = new ParseField("timeout");
@ -152,11 +153,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
this.timeout = timeout;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new JobTask(getJobId(), id, type, action, parentTaskId);
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -432,6 +428,12 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
});
}
@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<Request> persistentTask) {
return new JobTask(persistentTask.getParams().getJobId(), id, type, action, parentTaskId);
}
void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(),
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);

View File

@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
@ -56,7 +57,7 @@ import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -94,7 +95,7 @@ public class StartDatafeedAction
return new Response();
}
public static class Request extends PersistentTaskRequest implements ToXContent {
public static class Request extends ActionRequest implements PersistentTaskParams, ToXContent {
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
@ -190,11 +191,6 @@ public class StartDatafeedAction
return e;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new DatafeedTask(id, type, action, parentTaskId, this);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -454,6 +450,11 @@ public class StartDatafeedAction
});
}
@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<Request> persistentTask) {
return new DatafeedTask(id, type, action, parentTaskId, persistentTask.getParams());
}
}
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
@ -20,10 +21,10 @@ public class NodePersistentTasksExecutor {
this.threadPool = threadPool;
}
public <Request extends PersistentTaskRequest> void executeTask(Request request,
AllocatedPersistentTask task,
PersistentTasksExecutor<Request> action) {
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
@ -33,7 +34,7 @@ public class NodePersistentTasksExecutor {
@Override
protected void doRun() throws Exception {
try {
action.nodeOperation(task, request);
executor.nodeOperation(task, params);
} catch (Exception ex) {
task.markAsFailed(ex);
}

View File

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
/**
* Parameters used to start persistent task
*/
public interface PersistentTaskParams extends NamedWriteable, ToXContent {
}

View File

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
/**
* Base class for a request for a persistent task
*/
public abstract class PersistentTaskRequest extends ActionRequest implements NamedWriteable, ToXContent {
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new AllocatedPersistentTask(id, type, action, getDescription(), parentTaskId);
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
@ -44,11 +45,11 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* Creates a new persistent task on master node
*
* @param action the action name
* @param request request
* @param params params
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String taskId, String action, Request request,
ActionListener<PersistentTask<?>> listener) {
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
@ -56,10 +57,10 @@ public class PersistentTasksClusterService extends AbstractComponent implements
if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
}
validate(action, clusterService.state(), request);
validate(action, clusterService.state(), params);
final Assignment assignment;
assignment = getAssignement(action, currentState, request);
return update(currentState, builder.addTask(taskId, action, request, assignment));
assignment = getAssignement(action, currentState, params);
return update(currentState, builder.addTask(taskId, action, params, assignment));
}
@Override
@ -191,14 +192,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements
});
}
private <Request extends PersistentTaskRequest> Assignment getAssignement(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(request, currentState);
private <Params extends PersistentTaskParams> Assignment getAssignement(String taskName, ClusterState currentState,
@Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
return persistentTasksExecutor.getAssignment(params, currentState);
}
private <Request extends PersistentTaskRequest> void validate(String taskName, ClusterState currentState, Request request) {
PersistentTasksExecutor<Request> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(request, currentState);
private <Params extends PersistentTaskParams> void validate(String taskName, ClusterState currentState, @Nullable Params params) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
persistentTasksExecutor.validate(params, currentState);
}
@Override
@ -215,7 +217,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
}
interface ExecutorNodeDecider {
<Request extends PersistentTaskRequest> Assignment getAssignment(String action, ClusterState currentState, Request request);
<Params extends PersistentTaskParams> Assignment getAssignment(String action, ClusterState currentState, Params params);
}
static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) {
@ -231,7 +233,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
if (taskInProgress.needsReassignment(event.state().nodes())) {
// there is an unassigned task or task with a disappeared node - we need to try assigning it
if (Objects.equals(taskInProgress.getAssignment(),
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getRequest())) == false) {
decider.getAssignment(taskInProgress.getTaskName(), event.state(), taskInProgress.getParams())) == false) {
// it looks like a assignment for at least one task is possible - let's trigger reassignment
reassignmentRequired = true;
break;
@ -276,7 +278,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
for (PersistentTask<?> task : tasks.tasks()) {
if (task.needsReassignment(nodes)) {
// there is an unassigned task - we need to try assigning it
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getRequest());
Assignment assignment = decider.getAssignment(task.getTaskName(), clusterState, task.getParams());
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace("reassigning task {} from node {} to node {}", task.getId(),
task.getAssignment().getExecutorNode(), assignment.getExecutorNode());

View File

@ -63,14 +63,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private static final ObjectParser<Builder, Void> PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new);
private static final ObjectParser<TaskBuilder<PersistentTaskRequest>, Void> PERSISTENT_TASK_PARSER =
private static final ObjectParser<TaskBuilder<PersistentTaskParams>, Void> PERSISTENT_TASK_PARSER =
new ObjectParser<>("tasks", TaskBuilder::new);
public static final ConstructingObjectParser<Assignment, Void> ASSIGNMENT_PARSER =
new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1]));
private static final NamedObjectParser<PersistentTaskRequest, Void> REQUEST_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskRequest.class, name, null);
private static final NamedObjectParser<PersistentTaskParams, Void> PARAMS_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(PersistentTaskParams.class, name, null);
private static final NamedObjectParser<Status, Void> STATUS_PARSER =
(XContentParser p, Void c, String name) -> p.namedObject(Status.class, name, null);
@ -88,15 +88,15 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskRequest> taskBuilder, List<PersistentTaskRequest> objects) -> {
(TaskBuilder<PersistentTaskParams> taskBuilder, List<PersistentTaskParams> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one request per task is allowed");
throw new IllegalArgumentException("only one params per task is allowed");
}
taskBuilder.setRequest(objects.get(0));
}, REQUEST_PARSER, new ParseField("request"));
taskBuilder.setParams(objects.get(0));
}, PARAMS_PARSER, new ParseField("params"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskRequest> taskBuilder, List<Status> objects) -> {
(TaskBuilder<PersistentTaskParams> taskBuilder, List<Status> objects) -> {
if (objects.size() != 1) {
throw new IllegalArgumentException("only one status per task is allowed");
}
@ -173,10 +173,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
@SuppressWarnings("unchecked")
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, String taskId) {
public static <Params extends PersistentTaskParams> PersistentTask<Params> getTaskWithId(ClusterState clusterState, String taskId) {
PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) {
return (PersistentTask<Request>) tasks.getTask(taskId);
return (PersistentTask<Params>) tasks.getTask(taskId);
}
return null;
}
@ -230,11 +230,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
/**
* A record that represents a single running persistent task
*/
public static class PersistentTask<Request extends PersistentTaskRequest> implements Writeable, ToXContent {
public static class PersistentTask<Params extends PersistentTaskParams> implements Writeable, ToXContent {
private final String id;
private final long allocationId;
private final String taskName;
private final Request request;
@Nullable
private final Params params;
@Nullable
private final Status status;
private final Assignment assignment;
@ -242,31 +243,29 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final Long allocationIdOnLastStatusUpdate;
public PersistentTask(String id, String taskName, Request request, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, request, null, assignment, null);
public PersistentTask(String id, String taskName, Params params, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, params, null, assignment, null);
}
public PersistentTask(PersistentTask<Request> task, long allocationId, Assignment assignment) {
this(task.id, allocationId, task.taskName, task.request, task.status,
public PersistentTask(PersistentTask<Params> task, long allocationId, Assignment assignment) {
this(task.id, allocationId, task.taskName, task.params, task.status,
assignment, task.allocationId);
}
public PersistentTask(PersistentTask<Request> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.request, status,
public PersistentTask(PersistentTask<Params> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.params, status,
task.assignment, task.allocationId);
}
private PersistentTask(String id, long allocationId, String taskName, Request request,
private PersistentTask(String id, long allocationId, String taskName, Params params,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.taskName = taskName;
this.request = request;
this.params = params;
this.status = status;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", allocationId);
}
@SuppressWarnings("unchecked")
@ -274,7 +273,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
id = in.readString();
allocationId = in.readLong();
taskName = in.readString();
request = (Request) in.readNamedWriteable(PersistentTaskRequest.class);
params = (Params) in.readOptionalNamedWriteable(PersistentTaskParams.class);
status = in.readOptionalNamedWriteable(Task.Status.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
@ -285,7 +284,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
out.writeString(id);
out.writeLong(allocationId);
out.writeString(taskName);
out.writeNamedWriteable(request);
out.writeOptionalNamedWriteable(params);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
@ -300,7 +299,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return Objects.equals(id, that.id) &&
allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) &&
Objects.equals(request, that.request) &&
Objects.equals(params, that.params) &&
Objects.equals(status, that.status) &&
Objects.equals(assignment, that.assignment) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
@ -308,7 +307,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public int hashCode() {
return Objects.hash(id, allocationId, taskName, request, status, assignment,
return Objects.hash(id, allocationId, taskName, params, status, assignment,
allocationIdOnLastStatusUpdate);
}
@ -329,8 +328,9 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return taskName;
}
public Request getRequest() {
return request;
@Nullable
public Params getParams() {
return params;
}
@Nullable
@ -367,25 +367,27 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params xParams) throws IOException {
builder.startObject();
{
builder.field("id", id);
builder.field("name", taskName);
builder.startObject("request");
{
builder.field(request.getWriteableName(), request, params);
if (params != null) {
builder.startObject("params");
{
builder.field(params.getWriteableName(), params, xParams);
}
builder.endObject();
}
builder.endObject();
if (status != null) {
builder.startObject("status");
{
builder.field(status.getWriteableName(), status, params);
builder.field(status.getWriteableName(), status, xParams);
}
builder.endObject();
}
if (API_CONTEXT.equals(params.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
if (API_CONTEXT.equals(xParams.param(MetaData.CONTEXT_MODE_PARAM, API_CONTEXT))) {
// These are transient values that shouldn't be persisted to gateway cluster state or snapshot
builder.field("allocation_id", allocationId);
builder.startObject("assignment");
@ -409,53 +411,53 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
}
private static class TaskBuilder<Request extends PersistentTaskRequest> {
private static class TaskBuilder<Params extends PersistentTaskParams> {
private String id;
private long allocationId;
private String taskName;
private Request request;
private Params params;
private Status status;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(String id) {
public TaskBuilder<Params> setId(String id) {
this.id = id;
return this;
}
public TaskBuilder<Request> setAllocationId(long allocationId) {
public TaskBuilder<Params> setAllocationId(long allocationId) {
this.allocationId = allocationId;
return this;
}
public TaskBuilder<Request> setTaskName(String taskName) {
public TaskBuilder<Params> setTaskName(String taskName) {
this.taskName = taskName;
return this;
}
public TaskBuilder<Request> setRequest(Request request) {
this.request = request;
public TaskBuilder<Params> setParams(Params params) {
this.params = params;
return this;
}
public TaskBuilder<Request> setStatus(Status status) {
public TaskBuilder<Params> setStatus(Status status) {
this.status = status;
return this;
}
public TaskBuilder<Request> setAssignment(Assignment assignment) {
public TaskBuilder<Params> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
}
public TaskBuilder<Request> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
public TaskBuilder<Params> setAllocationIdOnLastStatusUpdate(Long allocationIdOnLastStatusUpdate) {
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
return this;
}
public PersistentTask<Request> build() {
return new PersistentTask<>(id, allocationId, taskName, request, status,
public PersistentTask<Params> build() {
return new PersistentTask<>(id, allocationId, taskName, params, status,
assignment, allocationIdOnLastStatusUpdate);
}
}
@ -486,7 +488,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("last_allocation_id", lastAllocationId);
builder.startArray("tasks");
for (PersistentTask<?> entry : tasks.values()) {
@ -526,7 +528,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this;
}
private <Request extends PersistentTaskRequest> Builder setTasks(List<TaskBuilder<Request>> tasks) {
private <Params extends PersistentTaskParams> Builder setTasks(List<TaskBuilder<Params>> tasks) {
for (TaskBuilder builder : tasks) {
PersistentTask<?> task = builder.build();
this.tasks.put(task.getId(), task);
@ -544,10 +546,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* <p>
* After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method.
*/
public <Request extends PersistentTaskRequest> Builder addTask(String taskId, String taskName, Request request,
Assignment assignment) {
public <Params extends PersistentTaskParams> Builder addTask(String taskId, String taskName, Params params,
Assignment assignment) {
changed = true;
PersistentTask<?> previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, request,
PersistentTask<?> previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, params,
getNextAllocationId(), assignment));
if (previousTask != null) {
throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}");

View File

@ -5,13 +5,14 @@
*/
package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.function.Predicate;
@ -19,7 +20,7 @@ import java.util.function.Predicate;
* An executor of tasks that can survive restart of requesting or executing node.
* These tasks are using cluster state rather than only transport service to send requests and responses.
*/
public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequest> extends AbstractComponent {
public abstract class PersistentTasksExecutor<Params extends PersistentTaskParams> extends AbstractComponent {
private final String executor;
private final String taskName;
@ -37,11 +38,11 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
public static final Assignment NO_NODE_FOUND = new Assignment(null, "no appropriate nodes found for the assignment");
/**
* Returns the node id where the request has to be executed,
* Returns the node id where the params has to be executed,
* <p>
* The default implementation returns the least loaded data node
*/
public Assignment getAssignment(Request request, ClusterState clusterState) {
public Assignment getAssignment(Params params, ClusterState clusterState) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, DiscoveryNode::isDataNode);
if (discoveryNode == null) {
return NO_NODE_FOUND;
@ -74,21 +75,36 @@ public abstract class PersistentTasksExecutor<Request extends PersistentTaskRequ
}
/**
* Checks the current cluster state for compatibility with the request
* Checks the current cluster state for compatibility with the params
* <p>
* Throws an exception if the supplied request cannot be executed on the cluster in the current state.
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
*/
public void validate(Request request, ClusterState clusterState) {
public void validate(Params params, ClusterState clusterState) {
}
/**
* Creates a AllocatedPersistentTask for communicating with task manager
*/
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<Params> taskInProgress) {
return new AllocatedPersistentTask(id, type, action, getDescription(taskInProgress), parentTaskId);
}
/**
* Returns task description that will be available via task manager
*/
protected String getDescription(PersistentTask<Params> taskInProgress) {
return "id=" + taskInProgress.getId();
}
/**
* This operation will be executed on the executor node.
* <p>
* NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
* indicate that the persistent task has finished.
*/
protected abstract void nodeOperation(AllocatedPersistentTask task, Request request);
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params);
public String getExecutor() {
return executor;

View File

@ -31,8 +31,8 @@ public class PersistentTasksExecutorRegistry extends AbstractComponent {
}
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> PersistentTasksExecutor<Request> getPersistentTaskExecutorSafe(String taskName) {
PersistentTasksExecutor<Request> executor = (PersistentTasksExecutor<Request>) taskExecutors.get(taskName);
public <Params extends PersistentTaskParams> PersistentTasksExecutor<Params> getPersistentTaskExecutorSafe(String taskName) {
PersistentTasksExecutor<Params> executor = (PersistentTasksExecutor<Params>) taskExecutors.get(taskName);
if (executor == null) {
throw new IllegalStateException("Unknown persistent executor [" + taskName + "]");
}

View File

@ -18,6 +18,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -118,16 +120,36 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
}
private <Request extends PersistentTaskRequest> void startTask(PersistentTask<Request> taskInProgress) {
PersistentTasksExecutor<Request> action = persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName());
private <Params extends PersistentTaskParams> void startTask(PersistentTask<Params> taskInProgress) {
PersistentTasksExecutor<Params> executor =
persistentTasksExecutorRegistry.getPersistentTaskExecutorSafe(taskInProgress.getTaskName());
TaskAwareRequest request = new TaskAwareRequest() {
TaskId parentTaskId = new TaskId("cluster", taskInProgress.getAllocationId());
@Override
public void setParentTask(TaskId taskId) {
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
}
@Override
public TaskId getParentTask() {
return parentTaskId;
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return executor.createTask(id, type, action, parentTaskId, taskInProgress);
}
};
AllocatedPersistentTask task = (AllocatedPersistentTask) taskManager.register("persistent", taskInProgress.getTaskName() + "[c]",
taskInProgress.getRequest());
request);
boolean processed = false;
try {
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
try {
runningTasks.put(taskInProgress.getAllocationId(), task);
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action);
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor);
} catch (Exception e) {
// Submit task failure
task.markAsFailed(e);

View File

@ -45,13 +45,13 @@ public class PersistentTasksService extends AbstractComponent {
* Creates the specified persistent task and attempts to assign it to a node.
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> void startPersistentTask(String taskId, String taskName, Request request,
ActionListener<PersistentTask<Request>> listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest =
new CreatePersistentTaskAction.Request(taskId, taskName, request);
public <Params extends PersistentTaskParams> void startPersistentTask(String taskId, String taskName, @Nullable Params params,
ActionListener<PersistentTask<Params>> listener) {
StartPersistentTaskAction.Request createPersistentActionRequest =
new StartPersistentTaskAction.Request(taskId, taskName, params);
try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse((PersistentTask<Request>) o.getTask()), listener::onFailure));
client.execute(StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
@ -170,8 +170,8 @@ public class PersistentTasksService extends AbstractComponent {
}
}
public interface WaitForPersistentTaskStatusListener<Request extends PersistentTaskRequest>
extends ActionListener<PersistentTask<Request>> {
public interface WaitForPersistentTaskStatusListener<Params extends PersistentTaskParams>
extends ActionListener<PersistentTask<Params>> {
default void onTimeout(TimeValue timeout) {
onFailure(new IllegalStateException("timed out after " + timeout));
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -34,14 +35,14 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* This action can be used to add the record for the persistent action to the cluster state.
*/
public class CreatePersistentTaskAction extends Action<CreatePersistentTaskAction.Request,
public class StartPersistentTaskAction extends Action<StartPersistentTaskAction.Request,
PersistentTaskResponse,
CreatePersistentTaskAction.RequestBuilder> {
StartPersistentTaskAction.RequestBuilder> {
public static final CreatePersistentTaskAction INSTANCE = new CreatePersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/create";
public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/start";
private CreatePersistentTaskAction() {
private StartPersistentTaskAction() {
super(NAME);
}
@ -59,18 +60,19 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
private String taskId;
@Nullable
private String action;
private PersistentTaskRequest request;
private PersistentTaskParams params;
public Request() {
}
public Request(String taskId, String action, PersistentTaskRequest request) {
public Request(String taskId, String action, PersistentTaskParams params) {
this.taskId = taskId;
this.action = action;
this.request = request;
this.params = params;
}
@Override
@ -78,7 +80,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.readFrom(in);
taskId = in.readString();
action = in.readString();
request = in.readNamedWriteable(PersistentTaskRequest.class);
params = in.readOptionalNamedWriteable(PersistentTaskParams.class);
}
@Override
@ -86,7 +88,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super.writeTo(out);
out.writeString(taskId);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeOptionalNamedWriteable(params);
}
@Override
@ -98,9 +100,6 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (this.action == null) {
validationException = addValidationError("action must be specified", validationException);
}
if (this.request == null) {
validationException = addValidationError("request must be specified", validationException);
}
return validationException;
}
@ -110,12 +109,12 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(taskId, request1.taskId) && Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request);
Objects.equals(params, request1.params);
}
@Override
public int hashCode() {
return Objects.hash(taskId, action, request);
return Objects.hash(taskId, action, params);
}
public String getAction() {
@ -134,20 +133,21 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
this.taskId = taskId;
}
public PersistentTaskRequest getRequest() {
return request;
public PersistentTaskParams getParams() {
return params;
}
public void setRequest(PersistentTaskRequest request) {
this.request = request;
@Nullable
public void setParams(PersistentTaskParams params) {
this.params = params;
}
}
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CreatePersistentTaskAction.Request,
PersistentTaskResponse, CreatePersistentTaskAction.RequestBuilder> {
public static class RequestBuilder extends MasterNodeOperationRequestBuilder<StartPersistentTaskAction.Request,
PersistentTaskResponse, StartPersistentTaskAction.RequestBuilder> {
protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction action) {
protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) {
super(client, action, new Request());
}
@ -161,8 +161,8 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
return this;
}
public RequestBuilder setRequest(PersistentTaskRequest persistentTaskRequest) {
request.setRequest(persistentTaskRequest);
public RequestBuilder setRequest(PersistentTaskParams params) {
request.setParams(params);
return this;
}
@ -179,7 +179,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry,
PersistentTasksService persistentTasksService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CreatePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTasksClusterService = persistentTasksClusterService;
NodePersistentTasksExecutor executor = new NodePersistentTasksExecutor(threadPool);
@ -206,7 +206,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.request,
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.params,
new ActionListener<PersistentTask<?>>() {
@Override

View File

@ -18,7 +18,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -51,7 +51,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
.build();
Map<String, PersistentTask<?>> tasks = new HashMap<>();
tasks.put("0L", new PersistentTask<PersistentTaskRequest>("0L", OpenJobAction.NAME,
tasks.put("0L", new PersistentTask<PersistentTaskParams>("0L", OpenJobAction.NAME,
new OpenJobAction.Request("job_id"), 0L, new Assignment("node_id", "")));
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
@ -80,7 +80,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
.build();
Map<String, PersistentTask<?>> tasks = new HashMap<>();
tasks.put("0L", new PersistentTask<PersistentTaskRequest>("0L", OpenJobAction.NAME,
tasks.put("0L", new PersistentTask<PersistentTaskParams>("0L", OpenJobAction.NAME,
new OpenJobAction.Request("job_id"), 0L, new Assignment(null, "no nodes")));
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,

View File

@ -27,7 +27,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
import org.elasticsearch.xpack.security.Security;
@ -74,9 +74,9 @@ public class DatafeedJobsIT extends SecurityIntegTestCase {
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE,
PersistentTasksCustomMetaData::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, StartDatafeedAction.NAME,
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.NAME,
StartDatafeedAction.Request::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.NAME, OpenJobAction.Request::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
PersistentTasksNodeService.Status::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new));

View File

@ -17,7 +17,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -54,7 +54,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
}
public void testValidate() {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("datafeed-foo", StartDatafeedAction.NAME,
PersistentTask<?> task = new PersistentTask<PersistentTaskParams>("datafeed-foo", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("datafeed-foo", task));
@ -75,7 +75,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
public void testValidate_alreadyStopped() {
PersistentTasksCustomMetaData tasks;
if (randomBoolean()) {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("1L", StartDatafeedAction.NAME,
PersistentTask<?> task = new PersistentTask<PersistentTaskParams>("1L", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo2", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
} else {
@ -97,7 +97,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
Map<String, PersistentTask<?>> taskMap = new HashMap<>();
Builder mlMetadataBuilder = new MlMetadata.Builder();
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("datafeed-datafeed_1", StartDatafeedAction.NAME,
PersistentTask<?> task = new PersistentTask<PersistentTaskParams>("datafeed-datafeed_1", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put("datafeed-datafeed_1", task);
@ -105,7 +105,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>("datafeed-datafeed_2", StartDatafeedAction.NAME,
task = new PersistentTask<PersistentTaskParams>("datafeed-datafeed_2", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED);
taskMap.put("datafeed-datafeed_2", task);
@ -113,7 +113,7 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>("datafeed-datafeed_3", StartDatafeedAction.NAME,
task = new PersistentTask<PersistentTaskParams>("3L", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put("datafeed-datafeed_3", task);

View File

@ -21,7 +21,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
import java.util.ArrayList;
import java.util.Arrays;
@ -54,9 +54,9 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
assertThat(dumpEvent(event), PersistentTasksClusterService.reassignmentRequired(event,
new PersistentTasksClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentTaskRequest> Assignment getAssignment(
String action, ClusterState currentState, Request request) {
if ("never_assign".equals(((TestRequest) request).getTestParam())) {
public <Params extends PersistentTaskParams> Assignment getAssignment(
String action, ClusterState currentState, Params params) {
if ("never_assign".equals(((TestParams) params).getTestParam())) {
return NO_NODE_FOUND;
}
return randomNodeAssignment(currentState.nodes());
@ -171,10 +171,10 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
return PersistentTasksClusterService.reassignTasks(clusterState, logger,
new PersistentTasksClusterService.ExecutorNodeDecider() {
@Override
public <Request extends PersistentTaskRequest> Assignment getAssignment(
String action, ClusterState currentState, Request request) {
TestRequest testRequest = (TestRequest) request;
switch (testRequest.getTestParam()) {
public <Params extends PersistentTaskParams> Assignment getAssignment(
String action, ClusterState currentState, Params params) {
TestParams testParams = (TestParams) params;
switch (testParams.getTestParam()) {
case "assign_me":
return randomNodeAssignment(currentState.nodes());
case "dont_assign_me":
@ -185,7 +185,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
case "assign_one":
return assignOnlyOneTaskAtATime(currentState);
default:
fail("unknown param " + testRequest.getTestParam());
fail("unknown param " + testParams.getTestParam());
}
return NO_NODE_FOUND;
}
@ -279,7 +279,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
for (PersistentTask<?> task : tasks.tasks()) {
// Remove all unassigned tasks that cause changing assignments they might trigger a significant change
if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) &&
if ("never_assign".equals(((TestParams) task.getParams()).getTestParam()) &&
"change me".equals(task.getAssignment().getExplanation())) {
logger.info("removed task with changing assignment {}", task.getId());
tasksBuilder.removeTask(task.getId());
@ -346,7 +346,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
logger.info("removed all unassigned tasks and changed routing table");
if (tasks != null) {
for (PersistentTask<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) {
if (task.getExecutorNode() == null || "never_assign".equals(((TestParams) task.getParams()).getTestParam())) {
tasksBuilder.removeTask(task.getId());
}
}
@ -368,7 +368,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
}
return tasks.tasks().stream().anyMatch(task -> {
if (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode())) {
return "never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) == false;
return "never_assign".equals(((TestParams) task.getParams()).getTestParam()) == false;
}
return false;
});
@ -390,11 +390,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
Assignment assignment, String param) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestRequest(param), assignment).build()));
tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestParams(param), assignment).build()));
}
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
tasks.addTask(UUIDs.base64UUID(), action, new TestRequest(param), new Assignment(node, "explanation: " + action));
tasks.addTask(UUIDs.base64UUID(), action, new TestParams(param), new Assignment(node, "explanation: " + action));
}
private DiscoveryNode newNode(String nodeId) {

View File

@ -28,7 +28,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Builder;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
import java.io.IOException;
import java.util.ArrayList;
@ -47,7 +47,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
for (int i = 0; i < numberOfTasks; i++) {
String taskId = UUIDs.base64UUID();
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)),
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)),
randomAssignment());
if (randomBoolean()) {
// From time to time update status
@ -67,7 +67,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
return new NamedWriteableRegistry(Arrays.asList(
new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new),
new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom),
new Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new),
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
new Entry(Task.Status.class, Status.NAME, Status::new)
));
}
@ -138,7 +138,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
private String addRandomTask(Builder builder) {
String taskId = UUIDs.base64UUID();
builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment());
builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), randomAssignment());
return taskId;
}
@ -149,8 +149,8 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
@Override
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(Arrays.asList(
new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestRequest::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestParams::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
));
}
@ -175,14 +175,14 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
assertEquals(testInstance.tasks().size(), newInstance.tasks().size());
for (PersistentTask<?> testTask : testInstance.tasks()) {
PersistentTask<TestRequest> newTask = (PersistentTask<TestRequest>) newInstance.getTask(testTask.getId());
PersistentTask<TestParams> newTask = (PersistentTask<TestParams>) newInstance.getTask(testTask.getId());
assertNotNull(newTask);
// Things that should be serialized
assertEquals(testTask.getTaskName(), newTask.getTaskName());
assertEquals(testTask.getId(), newTask.getId());
assertEquals(testTask.getStatus(), newTask.getStatus());
assertEquals(testTask.getRequest(), newTask.getRequest());
assertEquals(testTask.getParams(), newTask.getParams());
// Things that shouldn't be serialized
assertEquals(0, newTask.getAllocationId());

View File

@ -12,7 +12,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
import java.util.ArrayList;
import java.util.Collection;
@ -46,13 +46,14 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
int numberOfTasks = randomIntBetween(1, 10);
String[] taskIds = new String[numberOfTasks];
List<PlainActionFuture<PersistentTask<TestRequest>>> futures = new ArrayList<>(numberOfTasks);
List<PlainActionFuture<PersistentTask<TestParams>>> futures = new ArrayList<>(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) {
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
futures.add(future);
taskIds[i] = UUIDs.base64UUID();
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"),
future);
}
for (int i = 0; i < numberOfTasks; i++) {

View File

@ -19,7 +19,7 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
import org.junit.After;
@ -55,15 +55,15 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertNoRunningTasks();
}
public static class WaitForPersistentTaskStatusFuture<Request extends PersistentTaskRequest>
extends PlainActionFuture<PersistentTask<Request>>
implements WaitForPersistentTaskStatusListener<Request> {
public static class WaitForPersistentTaskStatusFuture<Params extends PersistentTaskParams>
extends PlainActionFuture<PersistentTask<Params>>
implements WaitForPersistentTaskStatusListener<Params> {
}
public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
long allocationId = future.get().getAllocationId();
assertBusy(() -> {
// Wait for the task to start
@ -92,29 +92,31 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionCompletion() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getAllocationId();
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
long allocationId = future.get().getAllocationId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);
.setDetailed(true).get().getTasks().get(0);
logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId());
// Verifying parent
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId));
// Verifying parent and description
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
assertThat(firstRunningTask.getDescription(), equalTo("id=" + taskId));
stopOrCancelTask(firstRunningTask.getTaskId());
}
public void testPersistentActionWithNoAvailableNode() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
TestRequest testRequest = new TestRequest("Blah");
testRequest.setExecutorNodeAttr("test");
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testRequest, future);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
TestParams testParams = new TestParams("Blah");
testParams.setExecutorNodeAttr("test");
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
String taskId = future.get().getId();
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
@ -147,8 +149,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionStatusUpdate() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
String taskId = future.get().getId();
assertBusy(() -> {
@ -205,13 +207,13 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testCreatePersistentTaskWithDuplicateId() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
future.get();
PlainActionFuture<PersistentTask<TestRequest>> future2 = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future2);
PlainActionFuture<PersistentTask<TestParams>> future2 = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2);
assertThrows(future2, ResourceAlreadyExistsException.class);
assertBusy(() -> {

View File

@ -20,8 +20,8 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
protected PersistentTaskResponse createTestInstance() {
if (randomBoolean()) {
return new PersistentTaskResponse(
new PersistentTask<PersistentTaskRequest>(UUIDs.base64UUID(), randomAsciiOfLength(10),
new TestPersistentTasksPlugin.TestRequest("test"),
new PersistentTask<PersistentTaskParams>(UUIDs.base64UUID(), randomAsciiOfLength(10),
new TestPersistentTasksPlugin.TestParams("test"),
randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
} else {
return new PersistentTaskResponse(null);
@ -36,7 +36,8 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestRequest::new)
new NamedWriteableRegistry.Entry(PersistentTaskParams.class,
TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestParams::new)
));
}
}

View File

@ -18,12 +18,13 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
import java.io.IOException;
import java.util.ArrayList;
@ -34,6 +35,10 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -59,12 +64,18 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
public void testStartTask() throws Exception {
ClusterService clusterService = createClusterService();
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestRequest> action = mock(PersistentTasksExecutor.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
when(action.getTaskName()).thenReturn("test");
int nonLocalNodesCount = randomInt(10);
// need to account for 5 original tasks on each node and their relocations
for (int i = 0; i < (nonLocalNodesCount + 1) * 10; i++) {
TaskId parentId = new TaskId("cluster", i);
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(
new TestPersistentTasksPlugin.TestTask(i, "persistent", "test", "", parentId));
}
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
int nonLocalNodesCount = randomInt(10);
MockExecutor executor = new MockExecutor();
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
registry, new TaskManager(Settings.EMPTY), executor);
@ -76,11 +87,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.addTask(UUIDs.base64UUID(), "test_action", new TestRequest("other_" + i),
tasks.addTask(UUIDs.base64UUID(), "test_action", new TestParams("other_" + i),
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
if (added == false && randomBoolean()) {
added = true;
tasks.addTask(UUIDs.base64UUID(), "test", new TestRequest("this_param"),
tasks.addTask(UUIDs.base64UUID(), "test", new TestParams("this_param"),
new Assignment("this_node", "test assignment on this node"));
}
}
@ -101,7 +112,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Add task on some other node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "some_other_node");
newClusterState = addTask(state, "test", null, "some_other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action wasn't called again
@ -109,7 +120,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Start another task on this node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest("this_param"), "this_node");
newClusterState = addTask(state, "test", new TestParams("this_param"), "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action was called this time
@ -124,7 +135,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Add task on some other node
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "some_other_node");
newClusterState = addTask(state, "test", null, "some_other_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Make sure action wasn't called again
@ -162,9 +173,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
fail("Shouldn't be called during Cluster State cancellation");
}
};
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestRequest> action = mock(PersistentTasksExecutor.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
when(action.getTaskName()).thenReturn("test");
when(action.createTask(anyLong(), anyString(), anyString(), any(), any()))
.thenReturn(new TestPersistentTasksPlugin.TestTask(1, "persistent", "test", "", new TaskId("cluster", 1)));
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
int nonLocalNodesCount = randomInt(10);
@ -179,7 +192,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
ClusterState newClusterState = state;
// Allocate first task
state = newClusterState;
newClusterState = addTask(state, "test", new TestRequest(), "this_node");
newClusterState = addTask(state, "test", null, "this_node");
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
// Check the the task is know to the task manager
@ -222,12 +235,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
private <Request extends PersistentTaskRequest> ClusterState addTask(ClusterState state, String action, Request request,
String node) {
private <Params extends PersistentTaskParams> ClusterState addTask(ClusterState state, String action, Params params,
String node) {
PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE,
builder.addTask(UUIDs.base64UUID(), action, request, new Assignment(node, "test assignment")).build())).build();
builder.addTask(UUIDs.base64UUID(), action, params, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState reallocateTask(ClusterState state, String taskId, String node) {
@ -247,12 +260,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
private class Execution {
private final PersistentTaskRequest request;
private final PersistentTaskParams params;
private final AllocatedPersistentTask task;
private final PersistentTasksExecutor<?> holder;
Execution(PersistentTaskRequest request, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) {
this.request = request;
Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) {
this.params = params;
this.task = task;
this.holder = holder;
}
@ -266,9 +279,9 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
@Override
public <Request extends PersistentTaskRequest> void executeTask(Request request, AllocatedPersistentTask task,
PersistentTasksExecutor<Request> action) {
executions.add(new Execution(request, task, action));
public <Params extends PersistentTaskParams> void executeTask(Params params, AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
executions.add(new Execution(params, task, executor));
}
public Execution get(int i) {

View File

@ -8,9 +8,9 @@ package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction.Request;
import org.elasticsearch.xpack.persistent.StartPersistentTaskAction.Request;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest;
import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestParams;
import org.elasticsearch.test.AbstractStreamableTestCase;
import java.util.Collections;
@ -19,17 +19,19 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas
@Override
protected Request createTestInstance() {
TestRequest testRequest = new TestRequest();
TestParams testParams;
if (randomBoolean()) {
testRequest.setTestParam(randomAlphaOfLengthBetween(1, 20));
testParams = new TestParams();
if (randomBoolean()) {
testParams.setTestParam(randomAlphaOfLengthBetween(1, 20));
}
if (randomBoolean()) {
testParams.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
}
} else {
testParams = null;
}
if (randomBoolean()) {
testRequest.setParentTask(randomAlphaOfLengthBetween(1, 20), randomLong());
}
if (randomBoolean()) {
testRequest.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
}
return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), new TestRequest());
return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), testParams);
}
@Override
@ -40,7 +42,7 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(
new Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new)
new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new)
));
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
@ -78,7 +77,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
@ -104,7 +103,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksExecutor.NAME, TestRequest::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new),
new NamedWriteableRegistry.Entry(Task.Status.class,
PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new),
new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE,
@ -120,16 +119,16 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
return Arrays.asList(
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksCustomMetaData.TYPE),
PersistentTasksCustomMetaData::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskRequest.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestRequest::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME),
TestParams::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent)
);
}
public static class TestRequest extends PersistentTaskRequest {
public static class TestParams implements PersistentTaskParams {
public static final ConstructingObjectParser<TestRequest, Void> REQUEST_PARSER =
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new TestRequest((String) args[0]));
public static final ConstructingObjectParser<TestParams, Void> REQUEST_PARSER =
new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new TestParams((String) args[0]));
static {
REQUEST_PARSER.declareString(constructorArg(), new ParseField("param"));
@ -141,21 +140,18 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
private String testParam = null;
public TestRequest() {
public TestParams() {
}
public TestRequest(String testParam) {
public TestParams(String testParam) {
this.testParam = testParam;
}
public TestRequest(StreamInput in) throws IOException {
readFrom(in);
}
@Override
public ActionRequestValidationException validate() {
return null;
public TestParams(StreamInput in) throws IOException {
executorNodeAttr = in.readOptionalString();
responseNode = in.readOptionalString();
testParam = in.readOptionalString();
}
@Override
@ -181,20 +177,11 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(executorNodeAttr);
out.writeOptionalString(responseNode);
out.writeOptionalString(testParam);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
executorNodeAttr = in.readOptionalString();
responseNode = in.readOptionalString();
testParam = in.readOptionalString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -203,7 +190,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
return builder;
}
public static TestRequest fromXContent(XContentParser parser) throws IOException {
public static TestParams fromXContent(XContentParser parser) throws IOException {
return REQUEST_PARSER.parse(parser, null);
}
@ -211,7 +198,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestRequest that = (TestRequest) o;
TestParams that = (TestParams) o;
return Objects.equals(executorNodeAttr, that.executorNodeAttr) &&
Objects.equals(responseNode, that.responseNode) &&
Objects.equals(testParam, that.testParam);
@ -221,11 +208,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public int hashCode() {
return Objects.hash(executorNodeAttr, responseNode, testParam);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new TestTask(id, type, action, getDescription(), parentTaskId);
}
}
public static class Status implements Task.Status {
@ -298,7 +280,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
}
public static class TestPersistentTasksExecutor extends PersistentTasksExecutor<TestRequest> {
public static class TestPersistentTasksExecutor extends PersistentTasksExecutor<TestParams> {
public static final String NAME = "cluster:admin/persistent/test";
private final ClusterService clusterService;
@ -309,12 +291,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
}
@Override
public Assignment getAssignment(TestRequest request, ClusterState clusterState) {
if (request.getExecutorNodeAttr() == null) {
return super.getAssignment(request, clusterState);
public Assignment getAssignment(TestParams params, ClusterState clusterState) {
if (params == null || params.getExecutorNodeAttr() == null) {
return super.getAssignment(params, clusterState);
} else {
DiscoveryNode executorNode = selectLeastLoadedNode(clusterState,
discoveryNode -> request.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr")));
discoveryNode -> params.getExecutorNodeAttr().equals(discoveryNode.getAttributes().get("test_attr")));
if (executorNode != null) {
return new Assignment(executorNode.getId(), "test assignment");
} else {
@ -325,7 +307,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
}
@Override
protected void nodeOperation(AllocatedPersistentTask task, TestRequest request) {
protected void nodeOperation(AllocatedPersistentTask task, TestParams params) {
logger.info("started node operation for the task {}", task);
try {
TestTask testTask = (TestTask) task;
@ -385,6 +367,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
task.markAsFailed(e);
}
}
@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTask<TestParams> task) {
return new TestTask(id, type, action, getDescription(task), parentTaskId);
}
}
public static class TestTaskAction extends Action<TestTasksRequest, TestTasksResponse, TestTasksRequestBuilder> {

View File

@ -139,7 +139,7 @@ cluster:admin/xpack/ml/job/update
indices:internal/data/write/xpackdeletebyquery
cluster:internal/xpack/ml/job/update/process
cluster:admin/xpack/ml/delete_expired_data
cluster:admin/persistent/create
cluster:admin/persistent/start
cluster:admin/persistent/completion
cluster:admin/persistent/update_status
cluster:admin/persistent/remove