Persistent Tasks: switch from long task ids to string task ids (elastic/x-pack-elasticsearch#1035)

This commit switches from long persistent task ids to caller-supplied string persistent task ids.

Original commit: elastic/x-pack-elasticsearch@2dff985df7
This commit is contained in:
Igor Motov 2017-04-11 12:24:54 -04:00 committed by GitHub
parent a7dfbcd2cb
commit 428af93f7b
31 changed files with 409 additions and 360 deletions

View File

@ -441,7 +441,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
private void normalCloseJob(ClusterState currentState, Task task, Request request, private void normalCloseJob(ClusterState currentState, Task task, Request request,
ActionListener<Response> listener) { ActionListener<Response> listener) {
Map<String, Long> jobIdToPersistentTaskId = new HashMap<>(); Map<String, String> jobIdToPersistentTaskId = new HashMap<>();
for (String jobId : request.resolvedJobIds) { for (String jobId : request.resolvedJobIds) {
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
@ -460,11 +460,11 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
// Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed // Wait for job to be marked as closed in cluster state, which means the job persistent task has been removed
// This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state, // This api returns when job has been closed, but that doesn't mean the persistent task has been removed from cluster state,
// so wait for that to happen here. // so wait for that to happen here.
void waitForJobClosed(Request request, Map<String, Long> jobIdToPersistentTaskId, Response response, void waitForJobClosed(Request request, Map<String, String> jobIdToPersistentTaskId, Response response,
ActionListener<Response> listener) { ActionListener<Response> listener) {
persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
for (Map.Entry<String, Long> entry : jobIdToPersistentTaskId.entrySet()) { for (Map.Entry<String, String> entry : jobIdToPersistentTaskId.entrySet()) {
long persistentTaskId = entry.getValue(); String persistentTaskId = entry.getValue();
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false; return false;
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -317,13 +318,13 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
listener.onFailure(e); listener.onFailure(e);
} }
}; };
persistentTasksService.startPersistentTask(NAME, request, finalListener); persistentTasksService.startPersistentTask(UUIDs.base64UUID(), NAME, request, finalListener);
} else { } else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING)); listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
} }
} }
void waitForJobStarted(long taskId, Request request, ActionListener<Response> listener) { void waitForJobStarted(String taskId, Request request, ActionListener<Response> listener) {
JobPredicate predicate = new JobPredicate(); JobPredicate predicate = new JobPredicate();
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout, persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout,
new WaitForPersistentTaskStatusListener<Request>() { new WaitForPersistentTaskStatusListener<Request>() {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -364,13 +365,13 @@ public class StartDatafeedAction
listener.onFailure(e); listener.onFailure(e);
} }
}; };
persistentTasksService.startPersistentTask(NAME, request, finalListener); persistentTasksService.startPersistentTask(UUIDs.base64UUID(), NAME, request, finalListener);
} else { } else {
listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING)); listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING));
} }
} }
void waitForDatafeedStarted(long taskId, Request request, ActionListener<Response> listener) { void waitForDatafeedStarted(String taskId, Request request, ActionListener<Response> listener) {
Predicate<PersistentTask<?>> predicate = persistentTask -> { Predicate<PersistentTask<?>> predicate = persistentTask -> {
if (persistentTask == null) { if (persistentTask == null) {
return false; return false;

View File

@ -313,7 +313,7 @@ public class StopDatafeedAction
} }
} else { } else {
Set<String> executorNodes = new HashSet<>(); Set<String> executorNodes = new HashSet<>();
Map<String, Long> datafeedIdToPersistentTaskId = new HashMap<>(); Map<String, String> datafeedIdToPersistentTaskId = new HashMap<>();
for (String datafeedId : resolvedDatafeeds) { for (String datafeedId : resolvedDatafeeds) {
PersistentTask<?> datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks); PersistentTask<?> datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks);
@ -350,10 +350,11 @@ public class StopDatafeedAction
// Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed
// This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state,
// so wait for that to happen here. // so wait for that to happen here.
void waitForDatafeedStopped(Map<String, Long> datafeedIdToPersistentTaskId, Request request, Response response, ActionListener<Response> listener) { void waitForDatafeedStopped(Map<String, String> datafeedIdToPersistentTaskId, Request request, Response response,
ActionListener<Response> listener) {
persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
for (Map.Entry<String, Long> entry : datafeedIdToPersistentTaskId.entrySet()) { for (Map.Entry<String, String> entry : datafeedIdToPersistentTaskId.entrySet()) {
long persistentTaskId = entry.getValue(); String persistentTaskId = entry.getValue();
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
return false; return false;
} }

View File

@ -293,7 +293,7 @@ public class DatafeedManager extends AbstractComponent {
public class Holder { public class Holder {
private final long taskId; private final String taskId;
private final DatafeedConfig datafeed; private final DatafeedConfig datafeed;
// To ensure that we wait until loopback / realtime search has completed before we stop the datafeed // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed
private final ReentrantLock datafeedJobLock = new ReentrantLock(true); private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
@ -303,7 +303,7 @@ public class DatafeedManager extends AbstractComponent {
private final Consumer<Exception> handler; private final Consumer<Exception> handler;
volatile Future<?> future; volatile Future<?> future;
Holder(long taskId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, Holder(String taskId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker,
Consumer<Exception> handler) { Consumer<Exception> handler) {
this.taskId = taskId; this.taskId = taskId;
this.datafeed = datafeed; this.datafeed = datafeed;

View File

@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
* Represents a executor node operation that corresponds to a persistent task * Represents a executor node operation that corresponds to a persistent task
*/ */
public class AllocatedPersistentTask extends CancellableTask { public class AllocatedPersistentTask extends CancellableTask {
private long persistentTaskId; private String persistentTaskId;
private long allocationId; private long allocationId;
private final AtomicReference<State> state; private final AtomicReference<State> state;
@ -68,11 +68,11 @@ public class AllocatedPersistentTask extends CancellableTask {
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener); persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
} }
public long getPersistentTaskId() { public String getPersistentTaskId() {
return persistentTaskId; return persistentTaskId;
} }
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, long persistentTaskId, long void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
allocationId) { allocationId) {
this.persistentTasksService = persistentTasksService; this.persistentTasksService = persistentTasksService;
this.logger = logger; this.logger = logger;

View File

@ -29,6 +29,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste
import java.io.IOException; import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/** /**
* Action that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be * Action that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be
* removed from the cluster state in case of successful completion or restarted on some other node in case of failure. * removed from the cluster state in case of successful completion or restarted on some other node in case of failure.
@ -56,7 +58,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
private long taskId; private String taskId;
private Exception exception; private Exception exception;
@ -64,7 +66,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
} }
public Request(long taskId, Exception exception) { public Request(String taskId, Exception exception) {
this.taskId = taskId; this.taskId = taskId;
this.exception = exception; this.exception = exception;
} }
@ -72,20 +74,24 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
taskId = in.readLong(); taskId = in.readString();
exception = in.readException(); exception = in.readException();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(taskId); out.writeString(taskId);
out.writeException(exception); out.writeException(exception);
} }
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
return null; ActionRequestValidationException validationException = null;
if (taskId == null) {
validationException = addValidationError("task id is missing", validationException);
}
return validationException;
} }
@Override @Override
@ -93,7 +99,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o; Request request = (Request) o;
return taskId == request.taskId && return Objects.equals(taskId, request.taskId) &&
Objects.equals(exception, request.exception); Objects.equals(exception, request.exception);
} }

View File

@ -57,6 +57,8 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
private String taskId;
private String action; private String action;
private PersistentTaskRequest request; private PersistentTaskRequest request;
@ -65,7 +67,8 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
} }
public Request(String action, PersistentTaskRequest request) { public Request(String taskId, String action, PersistentTaskRequest request) {
this.taskId = taskId;
this.action = action; this.action = action;
this.request = request; this.request = request;
} }
@ -73,6 +76,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
taskId = in.readString();
action = in.readString(); action = in.readString();
request = in.readNamedWriteable(PersistentTaskRequest.class); request = in.readNamedWriteable(PersistentTaskRequest.class);
} }
@ -80,6 +84,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(taskId);
out.writeString(action); out.writeString(action);
out.writeNamedWriteable(request); out.writeNamedWriteable(request);
} }
@ -87,6 +92,9 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null; ActionRequestValidationException validationException = null;
if (this.taskId == null) {
validationException = addValidationError("task id must be specified", validationException);
}
if (this.action == null) { if (this.action == null) {
validationException = addValidationError("action must be specified", validationException); validationException = addValidationError("action must be specified", validationException);
} }
@ -101,13 +109,13 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o; Request request1 = (Request) o;
return Objects.equals(action, request1.action) && return Objects.equals(taskId, request1.taskId) && Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request); Objects.equals(request, request1.request);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(action, request); return Objects.hash(taskId, action, request);
} }
public String getAction() { public String getAction() {
@ -118,6 +126,14 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
this.action = action; this.action = action;
} }
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public PersistentTaskRequest getRequest() { public PersistentTaskRequest getRequest() {
return request; return request;
} }
@ -135,6 +151,11 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super(client, action, new Request()); super(client, action, new Request());
} }
public RequestBuilder setTaskId(String taskId) {
request.setTaskId(taskId);
return this;
}
public RequestBuilder setAction(String action) { public RequestBuilder setAction(String action) {
request.setAction(action); request.setAction(action);
return this; return this;
@ -185,7 +206,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override @Override
protected final void masterOperation(final Request request, ClusterState state, protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) { final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.action, request.request, persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.request,
new ActionListener<PersistentTask<?>>() { new ActionListener<PersistentTask<?>>() {
@Override @Override

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
@ -46,15 +47,19 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param request request * @param request request
* @param listener the listener that will be called when task is started * @param listener the listener that will be called when task is started
*/ */
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request, public <Request extends PersistentTaskRequest> void createPersistentTask(String taskId, String action, Request request,
ActionListener<PersistentTask<?>> listener) { ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder builder = builder(currentState);
if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
}
validate(action, clusterService.state(), request); validate(action, clusterService.state(), request);
final Assignment assignment; final Assignment assignment;
assignment = getAssignement(action, currentState, request); assignment = getAssignement(action, currentState, request);
return update(currentState, builder(currentState).addTask(action, request, assignment)); return update(currentState, builder.addTask(taskId, action, request, assignment));
} }
@Override @Override
@ -67,7 +72,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) { if (tasks != null) {
listener.onResponse(tasks.getTask(tasks.getCurrentId())); listener.onResponse(tasks.getTask(taskId));
} else { } else {
listener.onResponse(null); listener.onResponse(null);
} }
@ -83,7 +88,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param failure the reason for restarting the task or null if the task completed successfully * @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void completePersistentTask(long id, Exception failure, ActionListener<PersistentTask<?>> listener) { public void completePersistentTask(String id, Exception failure, ActionListener<PersistentTask<?>> listener) {
final String source; final String source;
if (failure != null) { if (failure != null) {
logger.warn("persistent task " + id + " failed", failure); logger.warn("persistent task " + id + " failed", failure);
@ -124,7 +129,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param id the id of a persistent task * @param id the id of a persistent task
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void removePersistentTask(long id, ActionListener<PersistentTask<?>> listener) { public void removePersistentTask(String id, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
@ -152,12 +157,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements
/** /**
* Update task status * Update task status
* *
* @param id the id of a persistent task * @param id the id of a persistent task
* @param allocationId the expected allocation id of the persistent task * @param allocationId the expected allocation id of the persistent task
* @param status new status * @param status new status
* @param listener the listener that will be called when task is removed * @param listener the listener that will be called when task is removed
*/ */
public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) { public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {

View File

@ -5,6 +5,8 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -34,7 +36,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.function.BiFunction; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -50,12 +52,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private static final String API_CONTEXT = MetaData.XContentContext.API.toString(); private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
// TODO: Implement custom Diff for tasks // TODO: Implement custom Diff for tasks
private final Map<Long, PersistentTask<?>> tasks; private final Map<String, PersistentTask<?>> tasks;
private final long currentId; private final long lastAllocationId;
public PersistentTasksCustomMetaData(long currentId, Map<Long, PersistentTask<?>> tasks) { public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, PersistentTask<?>> tasks) {
this.currentId = currentId; this.lastAllocationId = lastAllocationId;
this.tasks = tasks; this.tasks = tasks;
} }
@ -74,7 +76,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
static { static {
// Tasks parser initialization // Tasks parser initialization
PERSISTENT_TASKS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id")); PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id"));
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks")); PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
// Assignment parser // Assignment parser
@ -82,7 +84,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation")); ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
// Task parser initialization // Task parser initialization
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setId, new ParseField("id")); PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name")); PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareNamedObjects( PERSISTENT_TASK_PARSER.declareNamedObjects(
@ -111,11 +113,11 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this.tasks.values(); return this.tasks.values();
} }
public Map<Long, PersistentTask<?>> taskMap() { public Map<String, PersistentTask<?>> taskMap() {
return this.tasks; return this.tasks;
} }
public PersistentTask<?> getTask(long id) { public PersistentTask<?> getTask(String id) {
return this.tasks.get(id); return this.tasks.get(id);
} }
@ -137,13 +139,13 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
PersistentTasksCustomMetaData that = (PersistentTasksCustomMetaData) o; PersistentTasksCustomMetaData that = (PersistentTasksCustomMetaData) o;
return currentId == that.currentId && return lastAllocationId == that.lastAllocationId &&
Objects.equals(tasks, that.tasks); Objects.equals(tasks, that.tasks);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(tasks, currentId); return Objects.hash(tasks, lastAllocationId);
} }
@Override @Override
@ -171,10 +173,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, long taskId) { public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, String taskId) {
PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) { if (tasks != null) {
return (PersistentTask<Request>)tasks.getTask(taskId); return (PersistentTask<Request>) tasks.getTask(taskId);
} }
return null; return null;
} }
@ -219,7 +221,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override @Override
public String toString() { public String toString() {
return "node: [" + executorNode + "], explanation: [" + explanation +"]"; return "node: [" + executorNode + "], explanation: [" + explanation + "]";
} }
} }
@ -229,7 +231,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* A record that represents a single running persistent task * A record that represents a single running persistent task
*/ */
public static class PersistentTask<Request extends PersistentTaskRequest> implements Writeable, ToXContent { public static class PersistentTask<Request extends PersistentTaskRequest> implements Writeable, ToXContent {
private final long id; private final String id;
private final long allocationId; private final long allocationId;
private final String taskName; private final String taskName;
private final Request request; private final Request request;
@ -240,12 +242,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final Long allocationIdOnLastStatusUpdate; private final Long allocationIdOnLastStatusUpdate;
public PersistentTask(long id, String taskName, Request request, Assignment assignment) { public PersistentTask(String id, String taskName, Request request, long allocationId, Assignment assignment) {
this(id, 0L, taskName, request, null, assignment, null); this(id, allocationId, taskName, request, null, assignment, null);
} }
public PersistentTask(PersistentTask<Request> task, Assignment assignment) { public PersistentTask(PersistentTask<Request> task, long allocationId, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status, this(task.id, allocationId, task.taskName, task.request, task.status,
assignment, task.allocationId); assignment, task.allocationId);
} }
@ -254,7 +256,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
task.assignment, task.allocationId); task.assignment, task.allocationId);
} }
private PersistentTask(long id, long allocationId, String taskName, Request request, private PersistentTask(String id, long allocationId, String taskName, Request request,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) { Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id; this.id = id;
this.allocationId = allocationId; this.allocationId = allocationId;
@ -264,12 +266,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
this.assignment = assignment; this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID // Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id); request.setParentTask("cluster", allocationId);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public PersistentTask(StreamInput in) throws IOException { public PersistentTask(StreamInput in) throws IOException {
id = in.readLong(); id = in.readString();
allocationId = in.readLong(); allocationId = in.readLong();
taskName = in.readString(); taskName = in.readString();
request = (Request) in.readNamedWriteable(PersistentTaskRequest.class); request = (Request) in.readNamedWriteable(PersistentTaskRequest.class);
@ -280,7 +282,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id); out.writeString(id);
out.writeLong(allocationId); out.writeLong(allocationId);
out.writeString(taskName); out.writeString(taskName);
out.writeNamedWriteable(request); out.writeNamedWriteable(request);
@ -295,7 +297,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
PersistentTask<?> that = (PersistentTask<?>) o; PersistentTask<?> that = (PersistentTask<?>) o;
return id == that.id && return Objects.equals(id, that.id) &&
allocationId == that.allocationId && allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) && Objects.equals(taskName, that.taskName) &&
Objects.equals(request, that.request) && Objects.equals(request, that.request) &&
@ -315,7 +317,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return Strings.toString(this); return Strings.toString(this);
} }
public long getId() { public String getId() {
return id; return id;
} }
@ -408,7 +410,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
private static class TaskBuilder<Request extends PersistentTaskRequest> { private static class TaskBuilder<Request extends PersistentTaskRequest> {
private long id; private String id;
private long allocationId; private long allocationId;
private String taskName; private String taskName;
private Request request; private Request request;
@ -416,7 +418,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private Assignment assignment = INITIAL_ASSIGNMENT; private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate; private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(long id) { public TaskBuilder<Request> setId(String id) {
this.id = id; this.id = id;
return this; return this;
} }
@ -464,30 +466,28 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
public PersistentTasksCustomMetaData(StreamInput in) throws IOException { public PersistentTasksCustomMetaData(StreamInput in) throws IOException {
currentId = in.readLong(); lastAllocationId = in.readLong();
tasks = in.readMap(StreamInput::readLong, PersistentTask::new); tasks = in.readMap(StreamInput::readString, PersistentTask::new);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeLong(currentId); out.writeLong(lastAllocationId);
out.writeMap(tasks, StreamOutput::writeLong, (stream, value) -> { out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
value.writeTo(stream);
});
} }
public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException { public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(MetaData.Custom.class, TYPE, in); return readDiffFrom(MetaData.Custom.class, TYPE, in);
} }
public long getCurrentId() { public long getLastAllocationId() {
return currentId; return lastAllocationId;
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("current_id", currentId); builder.field("last_allocation_id", lastAllocationId);
builder.startArray("tasks"); builder.startArray("tasks");
for (PersistentTask<?> entry : tasks.values()) { for (PersistentTask<?> entry : tasks.values()) {
entry.toXContent(builder, params); entry.toXContent(builder, params);
@ -505,8 +505,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
public static class Builder { public static class Builder {
private final Map<Long, PersistentTask<?>> tasks = new HashMap<>(); private final Map<String, PersistentTask<?>> tasks = new HashMap<>();
private long currentId; private long lastAllocationId;
private boolean changed; private boolean changed;
public Builder() { public Builder() {
@ -515,14 +515,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
public Builder(PersistentTasksCustomMetaData tasksInProgress) { public Builder(PersistentTasksCustomMetaData tasksInProgress) {
if (tasksInProgress != null) { if (tasksInProgress != null) {
tasks.putAll(tasksInProgress.tasks); tasks.putAll(tasksInProgress.tasks);
currentId = tasksInProgress.currentId; lastAllocationId = tasksInProgress.lastAllocationId;
} else { } else {
currentId = 0; lastAllocationId = 0;
} }
} }
private Builder setCurrentId(long currentId) { private Builder setLastAllocationId(long currentId) {
this.currentId = currentId; this.lastAllocationId = currentId;
return this; return this;
} }
@ -534,82 +534,79 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this; return this;
} }
private long getNextAllocationId() {
lastAllocationId++;
return lastAllocationId;
}
/** /**
* Adds a new task to the builder * Adds a new task to the builder
* <p> * <p>
* After the task is added its id can be found by calling {{@link #getCurrentId()}} method. * After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method.
*/ */
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, Assignment assignment) { public <Request extends PersistentTaskRequest> Builder addTask(String taskId, String taskName, Request request,
Assignment assignment) {
changed = true; changed = true;
currentId++; PersistentTask<?> previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, request,
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment)); getNextAllocationId(), assignment));
if (previousTask != null) {
throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}");
}
return this; return this;
} }
/** /**
* Reassigns the task to another node if the task exist * Reassigns the task to another node
*/ */
public Builder reassignTask(long taskId, Assignment assignment) { public Builder reassignTask(String taskId, Assignment assignment) {
PersistentTask<?> taskInProgress = tasks.get(taskId); PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) { if (taskInProgress != null) {
changed = true; changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment)); tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment));
} else {
throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exits");
} }
return this; return this;
} }
/** /**
* Assigns the task to another node if the task exist and not currently assigned * Updates the task status
* <p>
* The operation is only performed if the task is not currently assigned to any nodes.
*/ */
@SuppressWarnings("unchecked") public Builder updateTaskStatus(String taskId, Status status) {
public <Request extends PersistentTaskRequest> Builder assignTask(long taskId,
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request);
if (assignment.isAssigned()) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
}
return this;
}
/**
* Updates the task status if the task exist
*/
public Builder updateTaskStatus(long taskId, Status status) {
PersistentTask<?> taskInProgress = tasks.get(taskId); PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) { if (taskInProgress != null) {
changed = true; changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, status)); tasks.put(taskId, new PersistentTask<>(taskInProgress, status));
} else {
throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exits");
} }
return this; return this;
} }
/** /**
* Removes the task if the task exist * Removes the task
*/ */
public Builder removeTask(long taskId) { public Builder removeTask(String taskId) {
if (tasks.remove(taskId) != null) { if (tasks.remove(taskId) != null) {
changed = true; changed = true;
} else {
throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exits");
} }
return this; return this;
} }
/** /**
* Finishes the task if the task exist. * Finishes the task
* * <p>
* If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped. * If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped.
*/ */
public Builder finishTask(long taskId) { public Builder finishTask(String taskId) {
PersistentTask<?> taskInProgress = tasks.get(taskId); PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) { if (taskInProgress != null) {
changed = true; changed = true;
tasks.remove(taskId); tasks.remove(taskId);
} else {
throw new ResourceNotFoundException("cannot finish task with id {" + taskId + "}, the task no longer exits");
} }
return this; return this;
} }
@ -617,14 +614,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
/** /**
* Checks if the task is currently present in the list * Checks if the task is currently present in the list
*/ */
public boolean hasTask(long taskId) { public boolean hasTask(String taskId) {
return tasks.containsKey(taskId); return tasks.containsKey(taskId);
} }
/** /**
* Checks if the task is currently present in the list and has the right allocation id * Checks if the task is currently present in the list and has the right allocation id
*/ */
public boolean hasTask(long taskId, long allocationId) { public boolean hasTask(String taskId, long allocationId) {
PersistentTask<?> taskInProgress = tasks.get(taskId); PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) { if (taskInProgress != null) {
return taskInProgress.getAllocationId() == allocationId; return taskInProgress.getAllocationId() == allocationId;
@ -632,11 +629,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return false; return false;
} }
/** Set<String> getCurrentTaskIds() {
* Returns the id of the last added task return tasks.keySet();
*/
public long getCurrentId() {
return currentId;
} }
/** /**
@ -647,7 +641,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
} }
public PersistentTasksCustomMetaData build() { public PersistentTasksCustomMetaData build() {
return new PersistentTasksCustomMetaData(currentId, Collections.unmodifiableMap(tasks)); return new PersistentTasksCustomMetaData(lastAllocationId, Collections.unmodifiableMap(tasks));
} }
} }
} }

View File

@ -35,7 +35,7 @@ import static java.util.Objects.requireNonNull;
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands. * non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
*/ */
public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener { public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener {
private final Map<PersistentTaskId, AllocatedPersistentTask> runningTasks = new HashMap<>(); private final Map<Long, AllocatedPersistentTask> runningTasks = new HashMap<>();
private final PersistentTasksService persistentTasksService; private final PersistentTasksService persistentTasksService;
private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry; private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry;
private final TaskManager taskManager; private final TaskManager taskManager;
@ -84,24 +84,24 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) { if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {
// We have some changes let's check if they are related to our node // We have some changes let's check if they are related to our node
String localNodeId = event.state().getNodes().getLocalNodeId(); String localNodeId = event.state().getNodes().getLocalNodeId();
Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet()); Set<Long> notVisitedTasks = new HashSet<>(runningTasks.keySet());
if (tasks != null) { if (tasks != null) {
for (PersistentTask<?> taskInProgress : tasks.tasks()) { for (PersistentTask<?> taskInProgress : tasks.tasks()) {
if (localNodeId.equals(taskInProgress.getExecutorNode())) { if (localNodeId.equals(taskInProgress.getExecutorNode())) {
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()); Long allocationId = taskInProgress.getAllocationId();
AllocatedPersistentTask persistentTask = runningTasks.get(persistentTaskId); AllocatedPersistentTask persistentTask = runningTasks.get(allocationId);
if (persistentTask == null) { if (persistentTask == null) {
// New task - let's start it // New task - let's start it
startTask(taskInProgress); startTask(taskInProgress);
} else { } else {
// The task is still running // The task is still running
notVisitedTasks.remove(persistentTaskId); notVisitedTasks.remove(allocationId);
} }
} }
} }
} }
for (PersistentTaskId id : notVisitedTasks) { for (Long id : notVisitedTasks) {
AllocatedPersistentTask task = runningTasks.get(id); AllocatedPersistentTask task = runningTasks.get(id);
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) { if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
// Result was sent to the caller and the caller acknowledged acceptance of the result // Result was sent to the caller and the caller acknowledged acceptance of the result
@ -126,7 +126,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
try { try {
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
try { try {
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task); runningTasks.put(taskInProgress.getAllocationId(), task);
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action); nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action);
} catch (Exception e) { } catch (Exception e) {
// Submit task failure // Submit task failure
@ -145,52 +145,26 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
* Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon * Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon
* cancellation. * cancellation.
*/ */
private void cancelTask(PersistentTaskId persistentTaskId) { private void cancelTask(Long allocationId) {
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId); AllocatedPersistentTask task = runningTasks.remove(allocationId);
if (task != null) { if (task.markAsCancelled()) {
if (task.markAsCancelled()) { // Cancel the local task using the task manager
// Cancel the local task using the task manager persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() { @Override
@Override public void onResponse(CancelTasksResponse cancelTasksResponse) {
public void onResponse(CancelTasksResponse cancelTasksResponse) { logger.trace("Persistent task with id {} was cancelled", task.getId());
logger.trace("Persistent task with id {} was cancelled", task.getId());
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
// There is really nothing we can do in case of failure here // There is really nothing we can do in case of failure here
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e); logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
} }
}); });
}
} }
} }
private static class PersistentTaskId {
private final long id;
private final long allocationId;
PersistentTaskId(long id, long allocationId) {
this.id = id;
this.allocationId = allocationId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTaskId that = (PersistentTaskId) o;
return id == that.id &&
allocationId == that.allocationId;
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId);
}
}
public static class Status implements Task.Status { public static class Status implements Task.Status {
public static final String NAME = "persistent_executor"; public static final String NAME = "persistent_executor";

View File

@ -45,9 +45,10 @@ public class PersistentTasksService extends AbstractComponent {
* Creates the specified persistent task and attempts to assign it to a node. * Creates the specified persistent task and attempts to assign it to a node.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> void startPersistentTask(String taskName, Request request, public <Request extends PersistentTaskRequest> void startPersistentTask(String taskId, String taskName, Request request,
ActionListener<PersistentTask<Request>> listener) { ActionListener<PersistentTask<Request>> listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(taskName, request); CreatePersistentTaskAction.Request createPersistentActionRequest =
new CreatePersistentTaskAction.Request(taskId, taskName, request);
try { try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse((PersistentTask<Request>) o.getTask()), listener::onFailure)); o -> listener.onResponse((PersistentTask<Request>) o.getTask()), listener::onFailure));
@ -59,7 +60,7 @@ public class PersistentTasksService extends AbstractComponent {
/** /**
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure * Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
*/ */
public void sendCompletionNotification(long taskId, Exception failure, ActionListener<PersistentTask<?>> listener) { public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure); CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
try { try {
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
@ -90,7 +91,7 @@ public class PersistentTasksService extends AbstractComponent {
* Persistent task implementers shouldn't call this method directly and use * Persistent task implementers shouldn't call this method directly and use
* {@link AllocatedPersistentTask#updatePersistentStatus} instead * {@link AllocatedPersistentTask#updatePersistentStatus} instead
*/ */
void updateStatus(long taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) { void updateStatus(String taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request updateStatusRequest = UpdatePersistentTaskStatusAction.Request updateStatusRequest =
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status); new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
try { try {
@ -104,7 +105,7 @@ public class PersistentTasksService extends AbstractComponent {
/** /**
* Cancels if needed and removes a persistent task * Cancels if needed and removes a persistent task
*/ */
public void cancelPersistentTask(long taskId, ActionListener<PersistentTask<?>> listener) { public void cancelPersistentTask(String taskId, ActionListener<PersistentTask<?>> listener) {
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId); RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
try { try {
client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()), client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()),
@ -118,7 +119,7 @@ public class PersistentTasksService extends AbstractComponent {
* Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't * Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't
* waits of it. * waits of it.
*/ */
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout, public void waitForPersistentTaskStatus(String taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
WaitForPersistentTaskStatusListener<?> listener) { WaitForPersistentTaskStatusListener<?> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) { if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {

View File

@ -9,7 +9,6 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -24,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -54,30 +52,30 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
private long taskId; private String taskId;
public Request() { public Request() {
} }
public Request(long taskId) { public Request(String taskId) {
this.taskId = taskId; this.taskId = taskId;
} }
public void setTaskId(long taskId) { public void setTaskId(String taskId) {
this.taskId = taskId; this.taskId = taskId;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
taskId = in.readLong(); taskId = in.readString();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(taskId); out.writeString(taskId);
} }
@Override @Override
@ -90,7 +88,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o; Request request = (Request) o;
return taskId == request.taskId; return Objects.equals(taskId, request.taskId);
} }
@Override @Override
@ -106,7 +104,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
super(client, action, new Request()); super(client, action, new Request());
} }
public final RequestBuilder setTaskId(long taskId) { public final RequestBuilder setTaskId(String taskId) {
request.setTaskId(taskId); request.setTaskId(taskId);
return this; return this;
} }

View File

@ -55,7 +55,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
public static class Request extends MasterNodeRequest<Request> { public static class Request extends MasterNodeRequest<Request> {
private long taskId; private String taskId;
private long allocationId; private long allocationId;
@ -65,13 +65,13 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
} }
public Request(long taskId, long allocationId, Task.Status status) { public Request(String taskId, long allocationId, Task.Status status) {
this.taskId = taskId; this.taskId = taskId;
this.allocationId = allocationId; this.allocationId = allocationId;
this.status = status; this.status = status;
} }
public void setTaskId(long taskId) { public void setTaskId(String taskId) {
this.taskId = taskId; this.taskId = taskId;
} }
@ -86,7 +86,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
taskId = in.readLong(); taskId = in.readString();
allocationId = in.readLong(); allocationId = in.readLong();
status = in.readOptionalNamedWriteable(Task.Status.class); status = in.readOptionalNamedWriteable(Task.Status.class);
} }
@ -94,7 +94,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeLong(taskId); out.writeString(taskId);
out.writeLong(allocationId); out.writeLong(allocationId);
out.writeOptionalNamedWriteable(status); out.writeOptionalNamedWriteable(status);
} }
@ -109,7 +109,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o; Request request = (Request) o;
return taskId == request.taskId && allocationId == request.allocationId && return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId &&
Objects.equals(status, request.status); Objects.equals(status, request.status);
} }
@ -126,7 +126,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
super(client, action, new Request()); super(client, action, new Request());
} }
public final RequestBuilder setTaskId(long taskId) { public final RequestBuilder setTaskId(String taskId) {
request.setTaskId(taskId); request.setTaskId(taskId);
return this; return this;
} }

View File

@ -50,9 +50,9 @@ public class MlAssignmentNotifierTests extends ESTestCase {
new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build(); .build();
Map<Long, PersistentTask<?>> tasks = new HashMap<>(); Map<String, PersistentTask<?>> tasks = new HashMap<>();
tasks.put(0L, new PersistentTask<PersistentTaskRequest>(0L, OpenJobAction.NAME, tasks.put("0L", new PersistentTask<PersistentTaskRequest>("0L", OpenJobAction.NAME,
new OpenJobAction.Request("job_id"), new Assignment("node_id", ""))); new OpenJobAction.Request("job_id"), 0L, new Assignment("node_id", "")));
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, tasks)).build(); new PersistentTasksCustomMetaData(0L, tasks)).build();
@ -79,9 +79,9 @@ public class MlAssignmentNotifierTests extends ESTestCase {
new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build(); .build();
Map<Long, PersistentTask<?>> tasks = new HashMap<>(); Map<String, PersistentTask<?>> tasks = new HashMap<>();
tasks.put(0L, new PersistentTask<PersistentTaskRequest>(0L, OpenJobAction.NAME, tasks.put("0L", new PersistentTask<PersistentTaskRequest>("0L", OpenJobAction.NAME,
new OpenJobAction.Request("job_id"), new Assignment(null, "no nodes"))); new OpenJobAction.Request("job_id"), 0L, new Assignment(null, "no nodes")));
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, tasks)).build(); new PersistentTasksCustomMetaData(0L, tasks)).build();

View File

@ -150,10 +150,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getJobs().get("1"), sameInstance(job1));
assertThat(result.getDatafeeds().get("1"), nullValue()); assertThat(result.getDatafeeds().get("1"), nullValue());
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, "1", null, JobState.CLOSED); PersistentTask<OpenJobAction.Request> task = createJobTask("0L", "1", null, JobState.CLOSED, 0L);
MlMetadata.Builder builder2 = new MlMetadata.Builder(result); MlMetadata.Builder builder2 = new MlMetadata.Builder(result);
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task)))); () -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task))));
assertThat(e.status(), equalTo(RestStatus.CONFLICT)); assertThat(e.status(), equalTo(RestStatus.CONFLICT));
} }
@ -273,7 +273,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress = PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); new PersistentTask<>("0", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress = PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));
@ -335,7 +335,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L);
PersistentTask<StartDatafeedAction.Request> taskInProgress = PersistentTask<StartDatafeedAction.Request> taskInProgress =
new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); new PersistentTask<>("0", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasksInProgress = PersistentTasksCustomMetaData tasksInProgress =
new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress));

View File

@ -58,10 +58,10 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false);
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id",
Collections.singletonList("*"))); Collections.singletonList("*")));
Map<Long, PersistentTask<?>> tasks = new HashMap<>(); Map<String, PersistentTask<?>> tasks = new HashMap<>();
PersistentTask<?> jobTask = createJobTask(1L, "job_id", null, JobState.OPENED); PersistentTask<?> jobTask = createJobTask("1L", "job_id", null, JobState.OPENED, 1L);
tasks.put(1L, jobTask); tasks.put("1L", jobTask);
tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED)); tasks.put("2L", createTask("2L", "datafeed_id", 0L, null, DatafeedState.STARTED, 2L));
ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, .putCustom(PersistentTasksCustomMetaData.TYPE,
@ -74,14 +74,14 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage());
tasks = new HashMap<>(); tasks = new HashMap<>();
tasks.put(1L, jobTask); tasks.put("1L", jobTask);
if (randomBoolean()) { if (randomBoolean()) {
tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED)); tasks.put("2L", createTask("2L", "datafeed_id", 0L, null, DatafeedState.STOPPED, 3L));
} }
ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
.putCustom(PersistentTasksCustomMetaData.TYPE, .putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(1L, tasks))).build(); new PersistentTasksCustomMetaData(3L, tasks))).build();
CloseJobAction.validateAndReturnJobTask("job_id", cs2); CloseJobAction.validateAndReturnJobTask("job_id", cs2);
} }
@ -102,15 +102,15 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3", mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3",
Collections.singletonList("*"))); Collections.singletonList("*")));
Map<Long, PersistentTask<?>> tasks = new HashMap<>(); Map<String, PersistentTask<?>> tasks = new HashMap<>();
PersistentTask<?> jobTask = createJobTask(1L, "job_id_1", null, JobState.OPENED); PersistentTask<?> jobTask = createJobTask("1L", "job_id_1", null, JobState.OPENED, 1L);
tasks.put(1L, jobTask); tasks.put("1L", jobTask);
jobTask = createJobTask(2L, "job_id_2", null, JobState.CLOSED); jobTask = createJobTask("2L", "job_id_2", null, JobState.CLOSED, 2L);
tasks.put(2L, jobTask); tasks.put("2L", jobTask);
jobTask = createJobTask(3L, "job_id_3", null, JobState.FAILED); jobTask = createJobTask("3L", "job_id_3", null, JobState.FAILED, 3L);
tasks.put(3L, jobTask); tasks.put("3L", jobTask);
ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) ClusterState cs1 = ClusterState.builder(new ClusterName("_name"))
.metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())
@ -122,14 +122,16 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
CloseJobAction.resolveAndValidateJobId("_all", cs1)); CloseJobAction.resolveAndValidateJobId("_all", cs1));
} }
public static PersistentTask<StartDatafeedAction.Request> createTask(long id, public static PersistentTask<StartDatafeedAction.Request> createTask(String id,
String datafeedId, String datafeedId,
long startTime, long startTime,
String nodeId, String nodeId,
DatafeedState state) { DatafeedState state,
long allocationId) {
PersistentTask<StartDatafeedAction.Request> task = PersistentTask<StartDatafeedAction.Request> task =
new PersistentTask<>(id, StartDatafeedAction.NAME, new PersistentTask<>(id, StartDatafeedAction.NAME,
new StartDatafeedAction.Request(datafeedId, startTime), new StartDatafeedAction.Request(datafeedId, startTime),
allocationId,
new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment"));
task = new PersistentTask<>(task, state); task = new PersistentTask<>(task, state);
return task; return task;

View File

@ -20,6 +20,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
@ -55,8 +56,8 @@ public class OpenJobActionTests extends ESTestCase {
mlBuilder.putJob(buildJobBuilder("job_id").build(), false); mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTask<OpenJobAction.Request> task = PersistentTask<OpenJobAction.Request> task =
createJobTask(1L, "job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED)); createJobTask("1L", "job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
OpenJobAction.validate("job_id", mlBuilder.build(), tasks); OpenJobAction.validate("job_id", mlBuilder.build(), tasks);
OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap())); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap()));
@ -83,8 +84,8 @@ public class OpenJobActionTests extends ESTestCase {
MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); MlMetadata.Builder mlBuilder = new MlMetadata.Builder();
mlBuilder.putJob(buildJobBuilder("job_id").build(), false); mlBuilder.putJob(buildJobBuilder("job_id").build(), false);
PersistentTask<OpenJobAction.Request> task = createJobTask(1L, "job_id", "_node_id", JobState.OPENED); PersistentTask<OpenJobAction.Request> task = createJobTask("1L", "job_id", "_node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
Exception e = expectThrows(ElasticsearchStatusException.class, Exception e = expectThrows(ElasticsearchStatusException.class,
() -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1)); () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1));
@ -103,10 +104,13 @@ public class OpenJobActionTests extends ESTestCase {
nodeAttr, Collections.emptySet(), Version.CURRENT)) nodeAttr, Collections.emptySet(), Version.CURRENT))
.build(); .build();
Map<Long, PersistentTask<?>> taskMap = new HashMap<>(); Map<String, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), new Assignment("_node_id1", "test assignment"))); taskMap.put("0L", new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 0L,
taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), new Assignment("_node_id1", "test assignment"))); new Assignment("_node_id1", "test assignment")));
taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), new Assignment("_node_id2", "test assignment"))); taskMap.put("1L", new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), 1L,
new Assignment("_node_id1", "test assignment")));
taskMap.put("2L", new PersistentTask<>("2L", OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), 2L,
new Assignment("_node_id2", "test assignment")));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
@ -128,17 +132,19 @@ public class OpenJobActionTests extends ESTestCase {
Map<String, String> nodeAttr = new HashMap<>(); Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
Map<Long, PersistentTask<?>> taskMap = new HashMap<>(); Map<String, PersistentTask<?>> taskMap = new HashMap<>();
long allocationId = 0;
for (int i = 0; i < numNodes; i++) { for (int i = 0; i < numNodes; i++) {
String nodeId = "_node_id" + i; String nodeId = "_node_id" + i;
TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i); TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i);
nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT)); nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT));
for (int j = 0; j < maxRunningJobsPerNode; j++) { for (int j = 0; j < maxRunningJobsPerNode; j++) {
long id = j + (maxRunningJobsPerNode * i); long id = j + (maxRunningJobsPerNode * i);
taskMap.put(id, createJobTask(id, "job_id" + id, nodeId, JobState.OPENED)); String taskId = UUIDs.base64UUID();
taskMap.put(taskId, createJobTask(taskId, "job_id" + id, nodeId, JobState.OPENED, allocationId++));
} }
} }
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(numNodes * maxRunningJobsPerNode, taskMap); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(allocationId, taskMap);
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder(); MetaData.Builder metaData = MetaData.builder();
@ -163,9 +169,9 @@ public class OpenJobActionTests extends ESTestCase {
.build(); .build();
PersistentTask<OpenJobAction.Request> task = PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 1L,
new Assignment("_node_id1", "test assignment")); new Assignment("_node_id1", "test assignment"));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
MetaData.Builder metaData = MetaData.builder(); MetaData.Builder metaData = MetaData.builder();
@ -192,12 +198,12 @@ public class OpenJobActionTests extends ESTestCase {
nodeAttr, Collections.emptySet(), Version.CURRENT)) nodeAttr, Collections.emptySet(), Version.CURRENT))
.build(); .build();
Map<Long, PersistentTask<?>> taskMap = new HashMap<>(); Map<String, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", null)); taskMap.put("0L", createJobTask("0L", "job_id1", "_node_id1", null, 0L));
taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", null)); taskMap.put("1L", createJobTask("1L", "job_id2", "_node_id1", null, 1L));
taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", null)); taskMap.put("2L", createJobTask("2L", "job_id3", "_node_id2", null, 2L));
taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", null)); taskMap.put("3L", createJobTask("3L", "job_id4", "_node_id2", null, 3L));
taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", null)); taskMap.put("4L", createJobTask("4L", "job_id5", "_node_id3", null, 4L));
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
@ -213,8 +219,8 @@ public class OpenJobActionTests extends ESTestCase {
Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger);
assertEquals("_node_id3", result.getExecutorNode()); assertEquals("_node_id3", result.getExecutorNode());
PersistentTask<OpenJobAction.Request> lastTask = createJobTask(5L, "job_id6", "_node_id3", null); PersistentTask<OpenJobAction.Request> lastTask = createJobTask("5L", "job_id6", "_node_id3", null, 6L);
taskMap.put(5L, lastTask); taskMap.put("5L", lastTask);
tasks = new PersistentTasksCustomMetaData(6L, taskMap); tasks = new PersistentTasksCustomMetaData(6L, taskMap);
csBuilder = ClusterState.builder(cs); csBuilder = ClusterState.builder(cs);
@ -224,8 +230,8 @@ public class OpenJobActionTests extends ESTestCase {
assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertNull("no node selected, because OPENING state", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTask<>(lastTask, new Assignment("_node_id3", "test assignment"))); taskMap.put("5L", new PersistentTask<>(lastTask, 7L, new Assignment("_node_id3", "test assignment")));
tasks = new PersistentTasksCustomMetaData(6L, taskMap); tasks = new PersistentTasksCustomMetaData(7L, taskMap);
csBuilder = ClusterState.builder(cs); csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
@ -234,8 +240,8 @@ public class OpenJobActionTests extends ESTestCase {
assertNull("no node selected, because stale task", result.getExecutorNode()); assertNull("no node selected, because stale task", result.getExecutorNode());
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
taskMap.put(5L, new PersistentTask<>(lastTask, (Task.Status) null)); taskMap.put("5L", new PersistentTask<>(lastTask, (Task.Status) null));
tasks = new PersistentTasksCustomMetaData(6L, taskMap); tasks = new PersistentTasksCustomMetaData(8L, taskMap);
csBuilder = ClusterState.builder(cs); csBuilder = ClusterState.builder(cs);
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
@ -280,11 +286,13 @@ public class OpenJobActionTests extends ESTestCase {
assertEquals(indexToRemove, result.get(0)); assertEquals(indexToRemove, result.get(0));
} }
public static PersistentTask<OpenJobAction.Request> createJobTask(long id, String jobId, String nodeId, JobState jobState) { public static PersistentTask<OpenJobAction.Request> createJobTask(String id, String jobId, String nodeId, JobState jobState,
long allocationId) {
PersistentTask<OpenJobAction.Request> task = PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment")); new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), allocationId,
new Assignment(nodeId, "test assignment"));
if (jobState != null) { if (jobState != null) {
task = new PersistentTask<>(task, new JobTaskStatus(jobState, 0L)); task = new PersistentTask<>(task, new JobTaskStatus(jobState, allocationId));
} }
return task; return task;
} }

View File

@ -70,8 +70,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED);
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", jobState); PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", jobState, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task));
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -91,8 +91,8 @@ public class StartDatafeedActionTests extends ESTestCase {
assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState +
"] while state [opened] is required", result.getExplanation()); "] while state [opened] is required", result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 1L);
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
cs = ClusterState.builder(new ClusterName("cluster_name")) cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -124,8 +124,8 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2); List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED));
@ -164,8 +164,8 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2); List<Tuple<Integer, ShardRoutingState>> states = new ArrayList<>(2);
states.add(new Tuple<>(0, ShardRoutingState.STARTED)); states.add(new Tuple<>(0, ShardRoutingState.STARTED));
@ -203,8 +203,8 @@ public class StartDatafeedActionTests extends ESTestCase {
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))
.build(); .build();
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
@ -234,8 +234,8 @@ public class StartDatafeedActionTests extends ESTestCase {
mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo")));
String nodeId = randomBoolean() ? "node_id2" : null; String nodeId = randomBoolean() ? "node_id2" : null;
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED); PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), nodeId, JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
@ -255,8 +255,8 @@ public class StartDatafeedActionTests extends ESTestCase {
assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node", assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node",
result.getExplanation()); result.getExplanation());
task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED); task = createJobTask("0L", job.getId(), "node_id1", JobState.OPENED, 0L);
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
cs = ClusterState.builder(new ClusterName("cluster_name")) cs = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(new MetaData.Builder() .metaData(new MetaData.Builder()
.putCustom(MlMetadata.TYPE, mlMetadata.build()) .putCustom(MlMetadata.TYPE, mlMetadata.build())
@ -284,8 +284,8 @@ public class StartDatafeedActionTests extends ESTestCase {
.putJob(job1, false) .putJob(job1, false)
.build(); .build();
PersistentTask<OpenJobAction.Request> task = PersistentTask<OpenJobAction.Request> task =
new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), INITIAL_ASSIGNMENT); new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id"), 0L, INITIAL_ASSIGNMENT);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task));
DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build();
MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1)
.putDatafeed(datafeedConfig1) .putDatafeed(datafeedConfig1)
@ -303,14 +303,14 @@ public class StartDatafeedActionTests extends ESTestCase {
.putDatafeed(datafeedConfig) .putDatafeed(datafeedConfig)
.build(); .build();
PersistentTask<OpenJobAction.Request> jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); PersistentTask<OpenJobAction.Request> jobTask = createJobTask("0L", "job_id", "node_id", JobState.OPENED, 0L);
PersistentTask<StartDatafeedAction.Request> datafeedTask = PersistentTask<StartDatafeedAction.Request> datafeedTask =
new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), new PersistentTask<>("1L", StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L),
new Assignment("node_id", "test assignment")); 1L, new Assignment("node_id", "test assignment"));
datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED);
Map<Long, PersistentTask<?>> taskMap = new HashMap<>(); Map<String, PersistentTask<?>> taskMap = new HashMap<>();
taskMap.put(0L, jobTask); taskMap.put("0L", jobTask);
taskMap.put(1L, datafeedTask); taskMap.put("1L", datafeedTask);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap);
Exception e = expectThrows(ElasticsearchStatusException.class, Exception e = expectThrows(ElasticsearchStatusException.class,

View File

@ -54,10 +54,10 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
} }
public void testValidate() { public void testValidate() {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>(1L, StartDatafeedAction.NAME, PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("1L", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); task = new PersistentTask<>(task, DatafeedState.STARTED);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
Job job = createDatafeedJob().build(new Date()); Job job = createDatafeedJob().build(new Date());
MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build();
@ -75,9 +75,9 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
public void testValidate_alreadyStopped() { public void testValidate_alreadyStopped() {
PersistentTasksCustomMetaData tasks; PersistentTasksCustomMetaData tasks;
if (randomBoolean()) { if (randomBoolean()) {
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>(1L, StartDatafeedAction.NAME, PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("1L", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("foo2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("foo2", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task));
} else { } else {
tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap()); tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap());
} }
@ -94,34 +94,34 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
} }
public void testResolveAll() { public void testResolveAll() {
Map<Long, PersistentTask<?>> taskMap = new HashMap<>(); Map<String, PersistentTask<?>> taskMap = new HashMap<>();
Builder mlMetadataBuilder = new MlMetadata.Builder(); Builder mlMetadataBuilder = new MlMetadata.Builder();
PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>(1L, StartDatafeedAction.NAME, PersistentTask<?> task = new PersistentTask<PersistentTaskRequest>("1L", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_1", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put(1L, task); taskMap.put("1L", task);
Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date());
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>(2L, StartDatafeedAction.NAME, task = new PersistentTask<PersistentTaskRequest>("2L", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STOPPED); task = new PersistentTask<>(task, DatafeedState.STOPPED);
taskMap.put(2L, task); taskMap.put("2L", task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
task = new PersistentTask<PersistentTaskRequest>(3L, StartDatafeedAction.NAME, task = new PersistentTask<PersistentTaskRequest>("3L", StartDatafeedAction.NAME,
new StartDatafeedAction.Request("datafeed_3", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", ""));
task = new PersistentTask<>(task, DatafeedState.STARTED); task = new PersistentTask<>(task, DatafeedState.STARTED);
taskMap.put(3L, task); taskMap.put("3L", task);
job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date());
datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build();
mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, taskMap); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap);
MlMetadata mlMetadata = mlMetadataBuilder.build(); MlMetadata mlMetadata = mlMetadataBuilder.build();
assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks)); assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks));

View File

@ -98,8 +98,8 @@ public class DatafeedManagerTests extends ESTestCase {
Job job = createDatafeedJob().build(new Date()); Job job = createDatafeedJob().build(new Date());
mlMetadata.putJob(job, false); mlMetadata.putJob(job, false);
mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build()); mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build());
PersistentTask<OpenJobAction.Request> task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); PersistentTask<OpenJobAction.Request> task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L);
PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task));
DiscoveryNodes nodes = DiscoveryNodes.builder() DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) Collections.emptyMap(), Collections.emptySet(), Version.CURRENT))

View File

@ -5,14 +5,16 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Request;
import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Request;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
public class CancelPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> { public class CancelPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> {
@Override @Override
protected Request createTestInstance() { protected Request createTestInstance() {
return new Request(randomLong()); return new Request(randomAsciiOfLength(10));
} }
@Override @Override

View File

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
@ -389,11 +390,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
Assignment assignment, String param) { Assignment assignment, String param) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE, return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
tasks.addTask(randomAlphaOfLength(10), new TestRequest(param), assignment).build())); tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestRequest(param), assignment).build()));
} }
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) { private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
tasks.addTask(action, new TestRequest(param), new Assignment(node, "explanation: " + action)); tasks.addTask(UUIDs.base64UUID(), action, new TestRequest(param), new Assignment(node, "explanation: " + action));
} }
private DiscoveryNode newNode(String nodeId) { private DiscoveryNode newNode(String nodeId) {

View File

@ -5,11 +5,13 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom; import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
@ -44,11 +46,12 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
int numberOfTasks = randomInt(10); int numberOfTasks = randomInt(10);
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {
tasks.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), String taskId = UUIDs.base64UUID();
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)),
randomAssignment()); randomAssignment());
if (randomBoolean()) { if (randomBoolean()) {
// From time to time update status // From time to time update status
tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAlphaOfLength(10))); tasks.updateTaskStatus(taskId, new Status(randomAlphaOfLength(10)));
} }
} }
return tasks.build(); return tasks.build();
@ -71,31 +74,30 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
@Override @Override
protected Custom makeTestChanges(Custom testInstance) { protected Custom makeTestChanges(Custom testInstance) {
PersistentTasksCustomMetaData tasksInProgress = (PersistentTasksCustomMetaData) testInstance; Builder builder = new Builder((PersistentTasksCustomMetaData) testInstance);
Builder builder = new Builder();
switch (randomInt(3)) { switch (randomInt(3)) {
case 0: case 0:
addRandomTask(builder); addRandomTask(builder);
break; break;
case 1: case 1:
if (tasksInProgress.tasks().isEmpty()) { if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder); addRandomTask(builder);
} else { } else {
builder.reassignTask(pickRandomTask(tasksInProgress), randomAssignment()); builder.reassignTask(pickRandomTask(builder), randomAssignment());
} }
break; break;
case 2: case 2:
if (tasksInProgress.tasks().isEmpty()) { if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder); addRandomTask(builder);
} else { } else {
builder.updateTaskStatus(pickRandomTask(tasksInProgress), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); builder.updateTaskStatus(pickRandomTask(builder), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
} }
break; break;
case 3: case 3:
if (tasksInProgress.tasks().isEmpty()) { if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder); addRandomTask(builder);
} else { } else {
builder.removeTask(pickRandomTask(tasksInProgress)); builder.removeTask(pickRandomTask(builder));
} }
break; break;
} }
@ -134,13 +136,14 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
return builder; return builder;
} }
private Builder addRandomTask(Builder builder) { private String addRandomTask(Builder builder) {
builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment()); String taskId = UUIDs.base64UUID();
return builder; builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment());
return taskId;
} }
private long pickRandomTask(PersistentTasksCustomMetaData testInstance) { private String pickRandomTask(PersistentTasksCustomMetaData.Builder testInstance) {
return randomFrom(new ArrayList<>(testInstance.tasks())).getId(); return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds()));
} }
@Override @Override
@ -189,7 +192,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
public void testBuilder() { public void testBuilder() {
PersistentTasksCustomMetaData persistentTasks = null; PersistentTasksCustomMetaData persistentTasks = null;
long lastKnownTask = -1; String lastKnownTask = "";
for (int i = 0; i < randomIntBetween(10, 100); i++) { for (int i = 0; i < randomIntBetween(10, 100); i++) {
final Builder builder; final Builder builder;
if (randomBoolean()) { if (randomBoolean()) {
@ -199,54 +202,46 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
} }
boolean changed = false; boolean changed = false;
for (int j = 0; j < randomIntBetween(1, 10); j++) { for (int j = 0; j < randomIntBetween(1, 10); j++) {
switch (randomInt(5)) { switch (randomInt(4)) {
case 0: case 0:
lastKnownTask = addRandomTask(builder).getCurrentId(); lastKnownTask = addRandomTask(builder);
changed = true; changed = true;
break; break;
case 1: case 1:
if (builder.hasTask(lastKnownTask)) { if (builder.hasTask(lastKnownTask)) {
changed = true; changed = true;
builder.reassignTask(lastKnownTask, randomAssignment());
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment()));
} }
builder.reassignTask(lastKnownTask, randomAssignment());
break; break;
case 2: case 2:
if (builder.hasTask(lastKnownTask)) { if (builder.hasTask(lastKnownTask)) {
PersistentTask<?> task = builder.build().getTask(lastKnownTask); changed = true;
if (randomBoolean()) { builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
// Trying to reassign to the same node
builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment());
} else {
// Trying to reassign to a different node
Assignment randomAssignment = randomAssignment();
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment);
// should change if the task was unassigned and was reassigned to a different node or started
if ((task.isAssigned() == false && randomAssignment.isAssigned())) {
changed = true;
}
}
} else { } else {
// task doesn't exist - shouldn't change String fLastKnownTask = lastKnownTask;
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment()); expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskStatus(fLastKnownTask, null));
} }
break; break;
case 3: case 3:
if (builder.hasTask(lastKnownTask)) { if (builder.hasTask(lastKnownTask)) {
changed = true; changed = true;
builder.removeTask(lastKnownTask);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask));
} }
builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
break; break;
case 4: case 4:
if (builder.hasTask(lastKnownTask)) { if (builder.hasTask(lastKnownTask)) {
changed = true; changed = true;
builder.finishTask(lastKnownTask);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.finishTask(fLastKnownTask));
} }
builder.removeTask(lastKnownTask);
break;
case 5:
if (builder.hasTask(lastKnownTask)) {
changed = true;
}
builder.finishTask(lastKnownTask);
break; break;
} }
} }

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
@ -44,17 +45,18 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
public void testFullClusterRestart() throws Exception { public void testFullClusterRestart() throws Exception {
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
int numberOfTasks = randomIntBetween(1, 10); int numberOfTasks = randomIntBetween(1, 10);
long[] taskIds = new long[numberOfTasks]; String[] taskIds = new String[numberOfTasks];
List<PlainActionFuture<PersistentTask<TestRequest>>> futures = new ArrayList<>(numberOfTasks); List<PlainActionFuture<PersistentTask<TestRequest>>> futures = new ArrayList<>(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
futures.add(future); futures.add(future);
service.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); taskIds[i] = UUIDs.base64UUID();
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
} }
for (int i = 0; i < numberOfTasks; i++) { for (int i = 0; i < numberOfTasks; i++) {
taskIds[i] = futures.get(i).get().getId(); assertThat(futures.get(i).get().getId(), equalTo(taskIds[i]));
} }
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()

View File

@ -5,8 +5,10 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
@ -61,8 +63,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionFailure() throws Exception { public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getId(); long allocationId = future.get().getAllocationId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
@ -72,7 +74,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
.get().getTasks().get(0); .get().getTasks().get(0);
logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId()); logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId());
// Verifying parent // Verifying parent
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId)); assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster")); assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
logger.info("Failing the running task"); logger.info("Failing the running task");
@ -91,8 +93,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionCompletion() throws Exception { public void testPersistentActionCompletion() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getId(); long taskId = future.get().getAllocationId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
@ -112,8 +114,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
TestRequest testRequest = new TestRequest("Blah"); TestRequest testRequest = new TestRequest("Blah");
testRequest.setExecutorNodeAttr("test"); testRequest.setExecutorNodeAttr("test");
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, testRequest, future); persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testRequest, future);
long taskId = future.get().getId(); String taskId = future.get().getId();
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
String newNode = internalCluster().startNode(nodeSettings); String newNode = internalCluster().startNode(nodeSettings);
@ -146,8 +148,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionStatusUpdate() throws Exception { public void testPersistentActionStatusUpdate() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>(); PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getId(); String taskId = future.get().getId();
assertBusy(() -> { assertBusy(() -> {
// Wait for the task to start // Wait for the task to start
@ -201,6 +203,38 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertThat(future2.get(), nullValue()); assertThat(future2.get(), nullValue());
} }
public void testCreatePersistentTaskWithDuplicateId() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
future.get();
PlainActionFuture<PersistentTask<TestRequest>> future2 = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future2);
assertThrows(future2, ResourceAlreadyExistsException.class);
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);
logger.info("Completing the running task");
// Fail the running task and make sure it restarts properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId());
assertBusy(() -> {
// Wait for the task to disappear completely
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(),
empty());
});
}
private void stopOrCancelTask(TaskId taskId) { private void stopOrCancelTask(TaskId taskId) {
if (randomBoolean()) { if (randomBoolean()) {

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -19,9 +20,9 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
protected PersistentTaskResponse createTestInstance() { protected PersistentTaskResponse createTestInstance() {
if (randomBoolean()) { if (randomBoolean()) {
return new PersistentTaskResponse( return new PersistentTaskResponse(
new PersistentTask<PersistentTaskRequest>(randomLong(), randomAsciiOfLength(10), new PersistentTask<PersistentTaskRequest>(UUIDs.base64UUID(), randomAsciiOfLength(10),
new TestPersistentTasksPlugin.TestRequest("test"), new TestPersistentTasksPlugin.TestRequest("test"),
PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
} else { } else {
return new PersistentTaskResponse(null); return new PersistentTaskResponse(null);
} }

View File

@ -76,11 +76,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
boolean added = false; boolean added = false;
if (nonLocalNodesCount > 0) { if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) { for (int i = 0; i < randomInt(5); i++) {
tasks.addTask("test_action", new TestRequest("other_" + i), tasks.addTask(UUIDs.base64UUID(), "test_action", new TestRequest("other_" + i),
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node")); new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
if (added == false && randomBoolean()) { if (added == false && randomBoolean()) {
added = true; added = true;
tasks.addTask("test", new TestRequest("this_param"), tasks.addTask(UUIDs.base64UUID(), "test", new TestRequest("this_param"),
new Assignment("this_node", "test assignment on this node")); new Assignment("this_node", "test assignment on this node"));
} }
} }
@ -118,8 +118,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Finish both tasks // Finish both tasks
executor.get(0).task.markAsFailed(new RuntimeException()); executor.get(0).task.markAsFailed(new RuntimeException());
executor.get(1).task.markAsCompleted(); executor.get(1).task.markAsCompleted();
long failedTaskId = executor.get(0).task.getParentTaskId().getId(); String failedTaskId = executor.get(0).task.getPersistentTaskId();
long finishedTaskId = executor.get(1).task.getParentTaskId().getId(); String finishedTaskId = executor.get(1).task.getPersistentTaskId();
executor.clear(); executor.clear();
// Add task on some other node // Add task on some other node
@ -158,7 +158,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
} }
@Override @Override
public void sendCompletionNotification(long taskId, Exception failure, ActionListener<PersistentTask<?>> listener) { public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
fail("Shouldn't be called during Cluster State cancellation"); fail("Shouldn't be called during Cluster State cancellation");
} }
}; };
@ -184,8 +184,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Check the the task is know to the task manager // Check the the task is know to the task manager
assertThat(taskManager.getTasks().size(), equalTo(1)); assertThat(taskManager.getTasks().size(), equalTo(1));
Task runningTask = taskManager.getTasks().values().iterator().next(); AllocatedPersistentTask runningTask = (AllocatedPersistentTask)taskManager.getTasks().values().iterator().next();
long persistentId = runningTask.getParentTaskId().getId(); String persistentId = runningTask.getPersistentTaskId();
long localId = runningTask.getId(); long localId = runningTask.getId();
// Make sure it returns correct status // Make sure it returns correct status
Task.Status status = runningTask.getStatus(); Task.Status status = runningTask.getStatus();
@ -227,10 +227,10 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE,
builder.addTask(action, request, new Assignment(node, "test assignment")).build())).build(); builder.addTask(UUIDs.base64UUID(), action, request, new Assignment(node, "test assignment")).build())).build();
} }
private ClusterState reallocateTask(ClusterState state, long taskId, String node) { private ClusterState reallocateTask(ClusterState state, String taskId, String node) {
PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
assertTrue(builder.hasTask(taskId)); assertTrue(builder.hasTask(taskId));
@ -238,7 +238,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build(); builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build();
} }
private ClusterState removeTask(ClusterState state, long taskId) { private ClusterState removeTask(ClusterState state, String taskId) {
PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
assertTrue(builder.hasTask(taskId)); assertTrue(builder.hasTask(taskId));

View File

@ -5,14 +5,14 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Request;
import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Request;
public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> { public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> {
@Override @Override
protected Request createTestInstance() { protected Request createTestInstance() {
return new Request(randomLong(), null); return new Request(randomAlphaOfLength(10), null);
} }
@Override @Override

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction.Request; import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction.Request;
@ -28,7 +29,7 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas
if (randomBoolean()) { if (randomBoolean()) {
testRequest.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); testRequest.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
} }
return new Request(randomAlphaOfLengthBetween(1, 20), new TestRequest()); return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), new TestRequest());
} }
@Override @Override

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.persistent; package org.elasticsearch.xpack.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.test.AbstractStreamableTestCase;
@ -17,7 +18,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
@Override @Override
protected Request createTestInstance() { protected Request createTestInstance() {
return new Request(randomLong(), randomLong(), new Status(randomAlphaOfLength(10))); return new Request(UUIDs.base64UUID(), randomLong(), new Status(randomAlphaOfLength(10)));
} }
@Override @Override