Persistent Tasks: switch from long task ids to string task ids (#1035)

This commit switches from long persistent task ids to caller-supplied string persistent task ids.
This commit is contained in:
Igor Motov 2017-04-11 12:24:54 -04:00 committed by Martijn van Groningen
parent 0a1f25588b
commit 6bfea09dd6
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
19 changed files with 287 additions and 251 deletions

View File

@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
* Represents a executor node operation that corresponds to a persistent task
*/
public class AllocatedPersistentTask extends CancellableTask {
private long persistentTaskId;
private String persistentTaskId;
private long allocationId;
private final AtomicReference<State> state;
@ -81,11 +81,11 @@ public class AllocatedPersistentTask extends CancellableTask {
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
}
public long getPersistentTaskId() {
public String getPersistentTaskId() {
return persistentTaskId;
}
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, long persistentTaskId, long
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
allocationId) {
this.persistentTasksService = persistentTasksService;
this.logger = logger;

View File

@ -42,6 +42,8 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Action that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be
* removed from the cluster state in case of successful completion or restarted on some other node in case of failure.
@ -69,7 +71,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
public static class Request extends MasterNodeRequest<Request> {
private long taskId;
private String taskId;
private Exception exception;
@ -77,7 +79,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
}
public Request(long taskId, Exception exception) {
public Request(String taskId, Exception exception) {
this.taskId = taskId;
this.exception = exception;
}
@ -85,20 +87,24 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
taskId = in.readString();
exception = in.readException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
out.writeString(taskId);
out.writeException(exception);
}
@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (taskId == null) {
validationException = addValidationError("task id is missing", validationException);
}
return validationException;
}
@Override
@ -106,7 +112,7 @@ public class CompletionPersistentTaskAction extends Action<CompletionPersistentT
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return taskId == request.taskId &&
return Objects.equals(taskId, request.taskId) &&
Objects.equals(exception, request.exception);
}

View File

@ -66,6 +66,8 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
public static class Request extends MasterNodeRequest<Request> {
private String taskId;
private String action;
private PersistentTaskRequest request;
@ -74,7 +76,8 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
}
public Request(String action, PersistentTaskRequest request) {
public Request(String taskId, String action, PersistentTaskRequest request) {
this.taskId = taskId;
this.action = action;
this.request = request;
}
@ -82,6 +85,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readString();
action = in.readString();
request = in.readNamedWriteable(PersistentTaskRequest.class);
}
@ -89,6 +93,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(taskId);
out.writeString(action);
out.writeNamedWriteable(request);
}
@ -96,6 +101,9 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (this.taskId == null) {
validationException = addValidationError("task id must be specified", validationException);
}
if (this.action == null) {
validationException = addValidationError("action must be specified", validationException);
}
@ -110,13 +118,13 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(action, request1.action) &&
return Objects.equals(taskId, request1.taskId) && Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request);
}
@Override
public int hashCode() {
return Objects.hash(action, request);
return Objects.hash(taskId, action, request);
}
public String getAction() {
@ -127,6 +135,14 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
this.action = action;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public PersistentTaskRequest getRequest() {
return request;
}
@ -144,6 +160,11 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
super(client, action, new Request());
}
public RequestBuilder setTaskId(String taskId) {
request.setTaskId(taskId);
return this;
}
public RequestBuilder setAction(String action) {
request.setAction(action);
return this;
@ -194,7 +215,7 @@ public class CreatePersistentTaskAction extends Action<CreatePersistentTaskActio
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.action, request.request,
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.request,
new ActionListener<PersistentTask<?>>() {
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.persistent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -60,15 +61,19 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param request request
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request,
public <Request extends PersistentTaskRequest> void createPersistentTask(String taskId, String action, Request request,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder builder = builder(currentState);
if (builder.hasTask(taskId)) {
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
}
validate(action, clusterService.state(), request);
final Assignment assignment;
assignment = getAssignement(action, currentState, request);
return update(currentState, builder(currentState).addTask(action, request, assignment));
return update(currentState, builder.addTask(taskId, action, request, assignment));
}
@Override
@ -81,7 +86,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) {
listener.onResponse(tasks.getTask(tasks.getCurrentId()));
listener.onResponse(tasks.getTask(taskId));
} else {
listener.onResponse(null);
}
@ -97,7 +102,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param failure the reason for restarting the task or null if the task completed successfully
* @param listener the listener that will be called when task is removed
*/
public void completePersistentTask(long id, Exception failure, ActionListener<PersistentTask<?>> listener) {
public void completePersistentTask(String id, Exception failure, ActionListener<PersistentTask<?>> listener) {
final String source;
if (failure != null) {
logger.warn("persistent task " + id + " failed", failure);
@ -138,7 +143,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements
* @param id the id of a persistent task
* @param listener the listener that will be called when task is removed
*/
public void removePersistentTask(long id, ActionListener<PersistentTask<?>> listener) {
public void removePersistentTask(String id, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
@ -166,12 +171,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements
/**
* Update task status
*
* @param id the id of a persistent task
* @param allocationId the expected allocation id of the persistent task
* @param status new status
* @param listener the listener that will be called when task is removed
* @param id the id of a persistent task
* @param allocationId the expected allocation id of the persistent task
* @param status new status
* @param listener the listener that will be called when task is removed
*/
public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
@ -47,7 +49,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -63,12 +65,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
// TODO: Implement custom Diff for tasks
private final Map<Long, PersistentTask<?>> tasks;
private final Map<String, PersistentTask<?>> tasks;
private final long currentId;
private final long lastAllocationId;
public PersistentTasksCustomMetaData(long currentId, Map<Long, PersistentTask<?>> tasks) {
this.currentId = currentId;
public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, PersistentTask<?>> tasks) {
this.lastAllocationId = lastAllocationId;
this.tasks = tasks;
}
@ -87,7 +89,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
static {
// Tasks parser initialization
PERSISTENT_TASKS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id"));
PERSISTENT_TASKS_PARSER.declareLong(Builder::setLastAllocationId, new ParseField("last_allocation_id"));
PERSISTENT_TASKS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_PARSER, new ParseField("tasks"));
// Assignment parser
@ -95,7 +97,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
ASSIGNMENT_PARSER.declareStringOrNull(constructorArg(), new ParseField("explanation"));
// Task parser initialization
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
@ -124,11 +126,11 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this.tasks.values();
}
public Map<Long, PersistentTask<?>> taskMap() {
public Map<String, PersistentTask<?>> taskMap() {
return this.tasks;
}
public PersistentTask<?> getTask(long id) {
public PersistentTask<?> getTask(String id) {
return this.tasks.get(id);
}
@ -150,13 +152,13 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTasksCustomMetaData that = (PersistentTasksCustomMetaData) o;
return currentId == that.currentId &&
return lastAllocationId == that.lastAllocationId &&
Objects.equals(tasks, that.tasks);
}
@Override
public int hashCode() {
return Objects.hash(tasks, currentId);
return Objects.hash(tasks, lastAllocationId);
}
@Override
@ -184,10 +186,10 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
@SuppressWarnings("unchecked")
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, long taskId) {
public static <Request extends PersistentTaskRequest> PersistentTask<Request> getTaskWithId(ClusterState clusterState, String taskId) {
PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (tasks != null) {
return (PersistentTask<Request>)tasks.getTask(taskId);
return (PersistentTask<Request>) tasks.getTask(taskId);
}
return null;
}
@ -232,7 +234,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public String toString() {
return "node: [" + executorNode + "], explanation: [" + explanation +"]";
return "node: [" + executorNode + "], explanation: [" + explanation + "]";
}
}
@ -242,7 +244,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
* A record that represents a single running persistent task
*/
public static class PersistentTask<Request extends PersistentTaskRequest> implements Writeable, ToXContent {
private final long id;
private final String id;
private final long allocationId;
private final String taskName;
private final Request request;
@ -253,12 +255,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private final Long allocationIdOnLastStatusUpdate;
public PersistentTask(long id, String taskName, Request request, Assignment assignment) {
this(id, 0L, taskName, request, null, assignment, null);
public PersistentTask(String id, String taskName, Request request, long allocationId, Assignment assignment) {
this(id, allocationId, taskName, request, null, assignment, null);
}
public PersistentTask(PersistentTask<Request> task, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status,
public PersistentTask(PersistentTask<Request> task, long allocationId, Assignment assignment) {
this(task.id, allocationId, task.taskName, task.request, task.status,
assignment, task.allocationId);
}
@ -267,7 +269,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
task.assignment, task.allocationId);
}
private PersistentTask(long id, long allocationId, String taskName, Request request,
private PersistentTask(String id, long allocationId, String taskName, Request request,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
@ -277,12 +279,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
request.setParentTask("cluster", id);
request.setParentTask("cluster", allocationId);
}
@SuppressWarnings("unchecked")
public PersistentTask(StreamInput in) throws IOException {
id = in.readLong();
id = in.readString();
allocationId = in.readLong();
taskName = in.readString();
request = (Request) in.readNamedWriteable(PersistentTaskRequest.class);
@ -293,7 +295,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
out.writeString(id);
out.writeLong(allocationId);
out.writeString(taskName);
out.writeNamedWriteable(request);
@ -308,7 +310,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTask<?> that = (PersistentTask<?>) o;
return id == that.id &&
return Objects.equals(id, that.id) &&
allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) &&
Objects.equals(request, that.request) &&
@ -328,7 +330,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return Strings.toString(this);
}
public long getId() {
public String getId() {
return id;
}
@ -421,7 +423,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
private static class TaskBuilder<Request extends PersistentTaskRequest> {
private long id;
private String id;
private long allocationId;
private String taskName;
private Request request;
@ -429,7 +431,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
public TaskBuilder<Request> setId(long id) {
public TaskBuilder<Request> setId(String id) {
this.id = id;
return this;
}
@ -477,30 +479,28 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public PersistentTasksCustomMetaData(StreamInput in) throws IOException {
currentId = in.readLong();
tasks = in.readMap(StreamInput::readLong, PersistentTask::new);
lastAllocationId = in.readLong();
tasks = in.readMap(StreamInput::readString, PersistentTask::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(currentId);
out.writeMap(tasks, StreamOutput::writeLong, (stream, value) -> {
value.writeTo(stream);
});
out.writeLong(lastAllocationId);
out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
}
public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(MetaData.Custom.class, TYPE, in);
}
public long getCurrentId() {
return currentId;
public long getLastAllocationId() {
return lastAllocationId;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("current_id", currentId);
builder.field("last_allocation_id", lastAllocationId);
builder.startArray("tasks");
for (PersistentTask<?> entry : tasks.values()) {
entry.toXContent(builder, params);
@ -518,8 +518,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public static class Builder {
private final Map<Long, PersistentTask<?>> tasks = new HashMap<>();
private long currentId;
private final Map<String, PersistentTask<?>> tasks = new HashMap<>();
private long lastAllocationId;
private boolean changed;
public Builder() {
@ -528,14 +528,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
public Builder(PersistentTasksCustomMetaData tasksInProgress) {
if (tasksInProgress != null) {
tasks.putAll(tasksInProgress.tasks);
currentId = tasksInProgress.currentId;
lastAllocationId = tasksInProgress.lastAllocationId;
} else {
currentId = 0;
lastAllocationId = 0;
}
}
private Builder setCurrentId(long currentId) {
this.currentId = currentId;
private Builder setLastAllocationId(long currentId) {
this.lastAllocationId = currentId;
return this;
}
@ -547,82 +547,79 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return this;
}
private long getNextAllocationId() {
lastAllocationId++;
return lastAllocationId;
}
/**
* Adds a new task to the builder
* <p>
* After the task is added its id can be found by calling {{@link #getCurrentId()}} method.
* After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method.
*/
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, Assignment assignment) {
public <Request extends PersistentTaskRequest> Builder addTask(String taskId, String taskName, Request request,
Assignment assignment) {
changed = true;
currentId++;
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment));
PersistentTask<?> previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, request,
getNextAllocationId(), assignment));
if (previousTask != null) {
throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}");
}
return this;
}
/**
* Reassigns the task to another node if the task exist
* Reassigns the task to another node
*/
public Builder reassignTask(long taskId, Assignment assignment) {
public Builder reassignTask(String taskId, Assignment assignment) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment));
} else {
throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exits");
}
return this;
}
/**
* Assigns the task to another node if the task exist and not currently assigned
* <p>
* The operation is only performed if the task is not currently assigned to any nodes.
* Updates the task status
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> Builder assignTask(long taskId,
BiFunction<String, Request, Assignment> executorNodeFunc) {
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request);
if (assignment.isAssigned()) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
}
return this;
}
/**
* Updates the task status if the task exist
*/
public Builder updateTaskStatus(long taskId, Status status) {
public Builder updateTaskStatus(String taskId, Status status) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, status));
} else {
throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exits");
}
return this;
}
/**
* Removes the task if the task exist
* Removes the task
*/
public Builder removeTask(long taskId) {
public Builder removeTask(String taskId) {
if (tasks.remove(taskId) != null) {
changed = true;
} else {
throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exits");
}
return this;
}
/**
* Finishes the task if the task exist.
*
* Finishes the task
* <p>
* If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped.
*/
public Builder finishTask(long taskId) {
public Builder finishTask(String taskId) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.remove(taskId);
} else {
throw new ResourceNotFoundException("cannot finish task with id {" + taskId + "}, the task no longer exits");
}
return this;
}
@ -630,14 +627,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
/**
* Checks if the task is currently present in the list
*/
public boolean hasTask(long taskId) {
public boolean hasTask(String taskId) {
return tasks.containsKey(taskId);
}
/**
* Checks if the task is currently present in the list and has the right allocation id
*/
public boolean hasTask(long taskId, long allocationId) {
public boolean hasTask(String taskId, long allocationId) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
return taskInProgress.getAllocationId() == allocationId;
@ -645,11 +642,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
return false;
}
/**
* Returns the id of the last added task
*/
public long getCurrentId() {
return currentId;
Set<String> getCurrentTaskIds() {
return tasks.keySet();
}
/**
@ -660,7 +654,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
}
public PersistentTasksCustomMetaData build() {
return new PersistentTasksCustomMetaData(currentId, Collections.unmodifiableMap(tasks));
return new PersistentTasksCustomMetaData(lastAllocationId, Collections.unmodifiableMap(tasks));
}
}
}

View File

@ -48,7 +48,7 @@ import static java.util.Objects.requireNonNull;
* non-transport client nodes in the cluster and monitors cluster state changes to detect started commands.
*/
public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener {
private final Map<PersistentTaskId, AllocatedPersistentTask> runningTasks = new HashMap<>();
private final Map<Long, AllocatedPersistentTask> runningTasks = new HashMap<>();
private final PersistentTasksService persistentTasksService;
private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry;
private final TaskManager taskManager;
@ -97,24 +97,24 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) {
// We have some changes let's check if they are related to our node
String localNodeId = event.state().getNodes().getLocalNodeId();
Set<PersistentTaskId> notVisitedTasks = new HashSet<>(runningTasks.keySet());
Set<Long> notVisitedTasks = new HashSet<>(runningTasks.keySet());
if (tasks != null) {
for (PersistentTask<?> taskInProgress : tasks.tasks()) {
if (localNodeId.equals(taskInProgress.getExecutorNode())) {
PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId());
AllocatedPersistentTask persistentTask = runningTasks.get(persistentTaskId);
Long allocationId = taskInProgress.getAllocationId();
AllocatedPersistentTask persistentTask = runningTasks.get(allocationId);
if (persistentTask == null) {
// New task - let's start it
startTask(taskInProgress);
} else {
// The task is still running
notVisitedTasks.remove(persistentTaskId);
notVisitedTasks.remove(allocationId);
}
}
}
}
for (PersistentTaskId id : notVisitedTasks) {
for (Long id : notVisitedTasks) {
AllocatedPersistentTask task = runningTasks.get(id);
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
// Result was sent to the caller and the caller acknowledged acceptance of the result
@ -139,7 +139,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
try {
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
try {
runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task);
runningTasks.put(taskInProgress.getAllocationId(), task);
nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action);
} catch (Exception e) {
// Submit task failure
@ -158,52 +158,26 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
* Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon
* cancellation.
*/
private void cancelTask(PersistentTaskId persistentTaskId) {
AllocatedPersistentTask task = runningTasks.remove(persistentTaskId);
if (task != null) {
if (task.markAsCancelled()) {
// Cancel the local task using the task manager
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
logger.trace("Persistent task with id {} was cancelled", task.getId());
private void cancelTask(Long allocationId) {
AllocatedPersistentTask task = runningTasks.remove(allocationId);
if (task.markAsCancelled()) {
// Cancel the local task using the task manager
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
logger.trace("Persistent task with id {} was cancelled", task.getId());
}
}
@Override
public void onFailure(Exception e) {
// There is really nothing we can do in case of failure here
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
}
});
}
@Override
public void onFailure(Exception e) {
// There is really nothing we can do in case of failure here
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
}
});
}
}
private static class PersistentTaskId {
private final long id;
private final long allocationId;
PersistentTaskId(long id, long allocationId) {
this.id = id;
this.allocationId = allocationId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersistentTaskId that = (PersistentTaskId) o;
return id == that.id &&
allocationId == that.allocationId;
}
@Override
public int hashCode() {
return Objects.hash(id, allocationId);
}
}
public static class Status implements Task.Status {
public static final String NAME = "persistent_executor";

View File

@ -58,9 +58,10 @@ public class PersistentTasksService extends AbstractComponent {
* Creates the specified persistent task and attempts to assign it to a node.
*/
@SuppressWarnings("unchecked")
public <Request extends PersistentTaskRequest> void startPersistentTask(String taskName, Request request,
public <Request extends PersistentTaskRequest> void startPersistentTask(String taskId, String taskName, Request request,
ActionListener<PersistentTask<Request>> listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(taskName, request);
CreatePersistentTaskAction.Request createPersistentActionRequest =
new CreatePersistentTaskAction.Request(taskId, taskName, request);
try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse((PersistentTask<Request>) o.getTask()), listener::onFailure));
@ -72,7 +73,7 @@ public class PersistentTasksService extends AbstractComponent {
/**
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
*/
public void sendCompletionNotification(long taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
try {
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
@ -103,7 +104,7 @@ public class PersistentTasksService extends AbstractComponent {
* Persistent task implementers shouldn't call this method directly and use
* {@link AllocatedPersistentTask#updatePersistentStatus} instead
*/
void updateStatus(long taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
void updateStatus(String taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request updateStatusRequest =
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
try {
@ -117,7 +118,7 @@ public class PersistentTasksService extends AbstractComponent {
/**
* Cancels if needed and removes a persistent task
*/
public void cancelPersistentTask(long taskId, ActionListener<PersistentTask<?>> listener) {
public void cancelPersistentTask(String taskId, ActionListener<PersistentTask<?>> listener) {
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
try {
client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()),
@ -131,7 +132,7 @@ public class PersistentTasksService extends AbstractComponent {
* Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't
* waits of it.
*/
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
public void waitForPersistentTaskStatus(String taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
WaitForPersistentTaskStatusListener<?> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {

View File

@ -22,7 +22,6 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -37,7 +36,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -67,30 +65,30 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
public static class Request extends MasterNodeRequest<Request> {
private long taskId;
private String taskId;
public Request() {
}
public Request(long taskId) {
public Request(String taskId) {
this.taskId = taskId;
}
public void setTaskId(long taskId) {
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
taskId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
out.writeString(taskId);
}
@Override
@ -103,7 +101,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return taskId == request.taskId;
return Objects.equals(taskId, request.taskId);
}
@Override
@ -119,7 +117,7 @@ public class RemovePersistentTaskAction extends Action<RemovePersistentTaskActio
super(client, action, new Request());
}
public final RequestBuilder setTaskId(long taskId) {
public final RequestBuilder setTaskId(String taskId) {
request.setTaskId(taskId);
return this;
}

View File

@ -68,7 +68,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
public static class Request extends MasterNodeRequest<Request> {
private long taskId;
private String taskId;
private long allocationId;
private Task.Status status;
@ -76,13 +76,13 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
}
public Request(long taskId, long allocationId, Task.Status status) {
public Request(String taskId, long allocationId, Task.Status status) {
this.taskId = taskId;
this.allocationId = allocationId;
this.status = status;
}
public void setTaskId(long taskId) {
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@ -97,7 +97,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = in.readLong();
taskId = in.readString();
allocationId = in.readLong();
status = in.readOptionalNamedWriteable(Task.Status.class);
}
@ -105,7 +105,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(taskId);
out.writeString(taskId);
out.writeLong(allocationId);
out.writeOptionalNamedWriteable(status);
}
@ -120,7 +120,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return taskId == request.taskId && allocationId == request.allocationId &&
return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId &&
Objects.equals(status, request.status);
}
@ -137,7 +137,7 @@ public class UpdatePersistentTaskStatusAction extends Action<UpdatePersistentTas
super(client, action, new Request());
}
public final RequestBuilder setTaskId(long taskId) {
public final RequestBuilder setTaskId(String taskId) {
request.setTaskId(taskId);
return this;
}

View File

@ -18,14 +18,16 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.persistent.RemovePersistentTaskAction.Request;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.persistent.RemovePersistentTaskAction.Request;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
public class CancelPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomLong());
return new Request(randomAsciiOfLength(10));
}
@Override

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
@ -403,11 +404,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks,
Assignment assignment, String param) {
return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE,
tasks.addTask(randomAlphaOfLength(10), new TestRequest(param), assignment).build()));
tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestRequest(param), assignment).build()));
}
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) {
tasks.addTask(action, new TestRequest(param), new Assignment(node, "explanation: " + action));
tasks.addTask(UUIDs.base64UUID(), action, new TestRequest(param), new Assignment(node, "explanation: " + action));
}
private DiscoveryNode newNode(String nodeId) {

View File

@ -18,11 +18,13 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.Writeable;
@ -57,11 +59,12 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
int numberOfTasks = randomInt(10);
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
for (int i = 0; i < numberOfTasks; i++) {
tasks.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)),
String taskId = UUIDs.base64UUID();
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)),
randomAssignment());
if (randomBoolean()) {
// From time to time update status
tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAlphaOfLength(10)));
tasks.updateTaskStatus(taskId, new Status(randomAlphaOfLength(10)));
}
}
return tasks.build();
@ -84,31 +87,30 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
@Override
protected Custom makeTestChanges(Custom testInstance) {
PersistentTasksCustomMetaData tasksInProgress = (PersistentTasksCustomMetaData) testInstance;
Builder builder = new Builder();
Builder builder = new Builder((PersistentTasksCustomMetaData) testInstance);
switch (randomInt(3)) {
case 0:
addRandomTask(builder);
break;
case 1:
if (tasksInProgress.tasks().isEmpty()) {
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.reassignTask(pickRandomTask(tasksInProgress), randomAssignment());
builder.reassignTask(pickRandomTask(builder), randomAssignment());
}
break;
case 2:
if (tasksInProgress.tasks().isEmpty()) {
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.updateTaskStatus(pickRandomTask(tasksInProgress), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
builder.updateTaskStatus(pickRandomTask(builder), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
}
break;
case 3:
if (tasksInProgress.tasks().isEmpty()) {
if (builder.getCurrentTaskIds().isEmpty()) {
addRandomTask(builder);
} else {
builder.removeTask(pickRandomTask(tasksInProgress));
builder.removeTask(pickRandomTask(builder));
}
break;
}
@ -147,13 +149,14 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
return builder;
}
private Builder addRandomTask(Builder builder) {
builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment());
return builder;
private String addRandomTask(Builder builder) {
String taskId = UUIDs.base64UUID();
builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment());
return taskId;
}
private long pickRandomTask(PersistentTasksCustomMetaData testInstance) {
return randomFrom(new ArrayList<>(testInstance.tasks())).getId();
private String pickRandomTask(PersistentTasksCustomMetaData.Builder testInstance) {
return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds()));
}
@Override
@ -202,7 +205,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
public void testBuilder() {
PersistentTasksCustomMetaData persistentTasks = null;
long lastKnownTask = -1;
String lastKnownTask = "";
for (int i = 0; i < randomIntBetween(10, 100); i++) {
final Builder builder;
if (randomBoolean()) {
@ -212,54 +215,46 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ
}
boolean changed = false;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
switch (randomInt(5)) {
switch (randomInt(4)) {
case 0:
lastKnownTask = addRandomTask(builder).getCurrentId();
lastKnownTask = addRandomTask(builder);
changed = true;
break;
case 1:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.reassignTask(lastKnownTask, randomAssignment());
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment()));
}
builder.reassignTask(lastKnownTask, randomAssignment());
break;
case 2:
if (builder.hasTask(lastKnownTask)) {
PersistentTask<?> task = builder.build().getTask(lastKnownTask);
if (randomBoolean()) {
// Trying to reassign to the same node
builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment());
} else {
// Trying to reassign to a different node
Assignment randomAssignment = randomAssignment();
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment);
// should change if the task was unassigned and was reassigned to a different node or started
if ((task.isAssigned() == false && randomAssignment.isAssigned())) {
changed = true;
}
}
changed = true;
builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
} else {
// task doesn't exist - shouldn't change
builder.assignTask(lastKnownTask, (s, request) -> randomAssignment());
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskStatus(fLastKnownTask, null));
}
break;
case 3:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.removeTask(lastKnownTask);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask));
}
builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null);
break;
case 4:
if (builder.hasTask(lastKnownTask)) {
changed = true;
builder.finishTask(lastKnownTask);
} else {
String fLastKnownTask = lastKnownTask;
expectThrows(ResourceNotFoundException.class, () -> builder.finishTask(fLastKnownTask));
}
builder.removeTask(lastKnownTask);
break;
case 5:
if (builder.hasTask(lastKnownTask)) {
changed = true;
}
builder.finishTask(lastKnownTask);
break;
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.persistent;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -57,17 +58,18 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase {
public void testFullClusterRestart() throws Exception {
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
int numberOfTasks = randomIntBetween(1, 10);
long[] taskIds = new long[numberOfTasks];
String[] taskIds = new String[numberOfTasks];
List<PlainActionFuture<PersistentTask<TestRequest>>> futures = new ArrayList<>(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) {
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
futures.add(future);
service.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
taskIds[i] = UUIDs.base64UUID();
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
}
for (int i = 0; i < numberOfTasks; i++) {
taskIds[i] = futures.get(i).get().getId();
assertThat(futures.get(i).get().getId(), equalTo(taskIds[i]));
}
PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData()

View File

@ -19,8 +19,10 @@
package org.elasticsearch.persistent;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
@ -75,8 +77,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionFailure() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getId();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long allocationId = future.get().getAllocationId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
@ -86,7 +88,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
.get().getTasks().get(0);
logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId());
// Verifying parent
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId));
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId));
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
logger.info("Failing the running task");
@ -105,8 +107,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionCompletion() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getId();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getAllocationId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
@ -126,8 +128,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
TestRequest testRequest = new TestRequest("Blah");
testRequest.setExecutorNodeAttr("test");
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, testRequest, future);
long taskId = future.get().getId();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testRequest, future);
String taskId = future.get().getId();
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
String newNode = internalCluster().startNode(nodeSettings);
@ -160,8 +162,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
public void testPersistentActionStatusUpdate() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
long taskId = future.get().getId();
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
String taskId = future.get().getId();
assertBusy(() -> {
// Wait for the task to start
@ -215,6 +217,38 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
assertThat(future2.get(), nullValue());
}
public void testCreatePersistentTaskWithDuplicateId() throws Exception {
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestRequest>> future = new PlainActionFuture<>();
String taskId = UUIDs.base64UUID();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future);
future.get();
PlainActionFuture<PersistentTask<TestRequest>> future2 = new PlainActionFuture<>();
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future2);
assertThrows(future2, ResourceAlreadyExistsException.class);
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks().size(), equalTo(1));
});
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);
logger.info("Completing the running task");
// Fail the running task and make sure it restarts properly
assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId())
.get().getTasks().size(), equalTo(1));
logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId());
assertBusy(() -> {
// Wait for the task to disappear completely
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(),
empty());
});
}
private void stopOrCancelTask(TaskId taskId) {
if (randomBoolean()) {

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
@ -32,9 +33,9 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest
protected PersistentTaskResponse createTestInstance() {
if (randomBoolean()) {
return new PersistentTaskResponse(
new PersistentTask<PersistentTaskRequest>(randomLong(), randomAsciiOfLength(10),
new PersistentTask<PersistentTaskRequest>(UUIDs.base64UUID(), randomAsciiOfLength(10),
new TestPersistentTasksPlugin.TestRequest("test"),
PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT));
} else {
return new PersistentTaskResponse(null);
}

View File

@ -90,11 +90,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
boolean added = false;
if (nonLocalNodesCount > 0) {
for (int i = 0; i < randomInt(5); i++) {
tasks.addTask("test_action", new TestRequest("other_" + i),
tasks.addTask(UUIDs.base64UUID(), "test_action", new TestRequest("other_" + i),
new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node"));
if (added == false && randomBoolean()) {
added = true;
tasks.addTask("test", new TestRequest("this_param"),
tasks.addTask(UUIDs.base64UUID(), "test", new TestRequest("this_param"),
new Assignment("this_node", "test assignment on this node"));
}
}
@ -132,8 +132,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Finish both tasks
executor.get(0).task.markAsFailed(new RuntimeException());
executor.get(1).task.markAsCompleted();
long failedTaskId = executor.get(0).task.getParentTaskId().getId();
long finishedTaskId = executor.get(1).task.getParentTaskId().getId();
String failedTaskId = executor.get(0).task.getPersistentTaskId();
String finishedTaskId = executor.get(1).task.getPersistentTaskId();
executor.clear();
// Add task on some other node
@ -172,7 +172,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
}
@Override
public void sendCompletionNotification(long taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
fail("Shouldn't be called during Cluster State cancellation");
}
};
@ -198,8 +198,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// Check the the task is know to the task manager
assertThat(taskManager.getTasks().size(), equalTo(1));
Task runningTask = taskManager.getTasks().values().iterator().next();
long persistentId = runningTask.getParentTaskId().getId();
AllocatedPersistentTask runningTask = (AllocatedPersistentTask)taskManager.getTasks().values().iterator().next();
String persistentId = runningTask.getPersistentTaskId();
long localId = runningTask.getId();
// Make sure it returns correct status
Task.Status status = runningTask.getStatus();
@ -241,10 +241,10 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE,
builder.addTask(action, request, new Assignment(node, "test assignment")).build())).build();
builder.addTask(UUIDs.base64UUID(), action, request, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState reallocateTask(ClusterState state, long taskId, String node) {
private ClusterState reallocateTask(ClusterState state, String taskId, String node) {
PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
assertTrue(builder.hasTask(taskId));
@ -252,7 +252,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build();
}
private ClusterState removeTask(ClusterState state, long taskId) {
private ClusterState removeTask(ClusterState state, String taskId) {
PersistentTasksCustomMetaData.Builder builder =
PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE));
assertTrue(builder.hasTask(taskId));

View File

@ -18,14 +18,14 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.persistent.CompletionPersistentTaskAction.Request;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.persistent.CompletionPersistentTaskAction.Request;
public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomLong(), null);
return new Request(randomAlphaOfLength(10), null);
}
@Override

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.persistent.CreatePersistentTaskAction.Request;
@ -41,7 +42,7 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas
if (randomBoolean()) {
testRequest.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20));
}
return new Request(randomAlphaOfLengthBetween(1, 20), new TestRequest());
return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), new TestRequest());
}
@Override

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.persistent;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractStreamableTestCase;
@ -30,7 +31,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase
@Override
protected Request createTestInstance() {
return new Request(randomLong(), randomLong(), new Status(randomAlphaOfLength(10)));
return new Request(UUIDs.base64UUID(), randomLong(), new Status(randomAlphaOfLength(10)));
}
@Override