From 428af93f7b12d4bf7d0f942d57a826e25f3ad880 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 11 Apr 2017 12:24:54 -0400 Subject: [PATCH] Persistent Tasks: switch from long task ids to string task ids (elastic/x-pack-elasticsearch#1035) This commit switches from long persistent task ids to caller-supplied string persistent task ids. Original commit: elastic/x-pack-elasticsearch@2dff985df78cc5e4efce08e5d9a3a7f71cc57eda --- .../xpack/ml/action/CloseJobAction.java | 8 +- .../xpack/ml/action/OpenJobAction.java | 5 +- .../xpack/ml/action/StartDatafeedAction.java | 5 +- .../xpack/ml/action/StopDatafeedAction.java | 9 +- .../xpack/ml/datafeed/DatafeedManager.java | 4 +- .../persistent/AllocatedPersistentTask.java | 6 +- .../CompletionPersistentTaskAction.java | 18 +- .../CreatePersistentTaskAction.java | 29 +++- .../PersistentTasksClusterService.java | 25 +-- .../PersistentTasksCustomMetaData.java | 162 +++++++++--------- .../PersistentTasksNodeService.java | 70 +++----- .../persistent/PersistentTasksService.java | 13 +- .../RemovePersistentTaskAction.java | 16 +- .../UpdatePersistentTaskStatusAction.java | 14 +- .../xpack/ml/MlAssignmentNotifierTests.java | 12 +- .../xpack/ml/MlMetadataTests.java | 8 +- .../ml/action/CloseJobActionRequestTests.java | 34 ++-- .../xpack/ml/action/OpenJobActionTests.java | 64 ++++--- .../ml/action/StartDatafeedActionTests.java | 44 ++--- .../StopDatafeedActionRequestTests.java | 34 ++-- .../ml/datafeed/DatafeedManagerTests.java | 4 +- .../CancelPersistentTaskRequestTests.java | 6 +- .../PersistentTasksClusterServiceTests.java | 5 +- .../PersistentTasksCustomMetaDataTests.java | 79 ++++----- .../PersistentTasksExecutorFullRestartIT.java | 8 +- .../persistent/PersistentTasksExecutorIT.java | 52 +++++- .../PersistentTasksExecutorResponseTests.java | 5 +- .../PersistentTasksNodeServiceTests.java | 20 +-- .../RestartPersistentTaskRequestTests.java | 4 +- .../StartPersistentActionRequestTests.java | 3 +- .../UpdatePersistentTaskRequestTests.java | 3 +- 31 files changed, 409 insertions(+), 360 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 0f2c0745950..1dc25116b63 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -441,7 +441,7 @@ public class CloseJobAction extends Action listener) { - Map jobIdToPersistentTaskId = new HashMap<>(); + Map jobIdToPersistentTaskId = new HashMap<>(); for (String jobId : request.resolvedJobIds) { auditor.info(jobId, Messages.JOB_AUDIT_CLOSING); @@ -460,11 +460,11 @@ public class CloseJobAction extends Action jobIdToPersistentTaskId, Response response, + void waitForJobClosed(Request request, Map jobIdToPersistentTaskId, Response response, ActionListener listener) { persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { - for (Map.Entry entry : jobIdToPersistentTaskId.entrySet()) { - long persistentTaskId = entry.getValue(); + for (Map.Entry entry : jobIdToPersistentTaskId.entrySet()) { + String persistentTaskId = entry.getValue(); if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { return false; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index e2d21ef2ce6..f17614cc0e5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -317,13 +318,13 @@ public class OpenJobAction extends Action listener) { + void waitForJobStarted(String taskId, Request request, ActionListener listener) { JobPredicate predicate = new JobPredicate(); persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout, new WaitForPersistentTaskStatusListener() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 25907c57cf9..b82d5c349a0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -364,13 +365,13 @@ public class StartDatafeedAction listener.onFailure(e); } }; - persistentTasksService.startPersistentTask(NAME, request, finalListener); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), NAME, request, finalListener); } else { listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING)); } } - void waitForDatafeedStarted(long taskId, Request request, ActionListener listener) { + void waitForDatafeedStarted(String taskId, Request request, ActionListener listener) { Predicate> predicate = persistentTask -> { if (persistentTask == null) { return false; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index b985a3ef7bc..1b5d6500ed7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -313,7 +313,7 @@ public class StopDatafeedAction } } else { Set executorNodes = new HashSet<>(); - Map datafeedIdToPersistentTaskId = new HashMap<>(); + Map datafeedIdToPersistentTaskId = new HashMap<>(); for (String datafeedId : resolvedDatafeeds) { PersistentTask datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks); @@ -350,10 +350,11 @@ public class StopDatafeedAction // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, // so wait for that to happen here. - void waitForDatafeedStopped(Map datafeedIdToPersistentTaskId, Request request, Response response, ActionListener listener) { + void waitForDatafeedStopped(Map datafeedIdToPersistentTaskId, Request request, Response response, + ActionListener listener) { persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { - for (Map.Entry entry : datafeedIdToPersistentTaskId.entrySet()) { - long persistentTaskId = entry.getValue(); + for (Map.Entry entry : datafeedIdToPersistentTaskId.entrySet()) { + String persistentTaskId = entry.getValue(); if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { return false; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index c3422f691d5..4b9b85be340 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -293,7 +293,7 @@ public class DatafeedManager extends AbstractComponent { public class Holder { - private final long taskId; + private final String taskId; private final DatafeedConfig datafeed; // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed private final ReentrantLock datafeedJobLock = new ReentrantLock(true); @@ -303,7 +303,7 @@ public class DatafeedManager extends AbstractComponent { private final Consumer handler; volatile Future future; - Holder(long taskId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, + Holder(String taskId, DatafeedConfig datafeed, DatafeedJob datafeedJob, boolean autoCloseJob, ProblemTracker problemTracker, Consumer handler) { this.taskId = taskId; this.datafeed = datafeed; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java index 514eafb80d7..816d68e16f9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; * Represents a executor node operation that corresponds to a persistent task */ public class AllocatedPersistentTask extends CancellableTask { - private long persistentTaskId; + private String persistentTaskId; private long allocationId; private final AtomicReference state; @@ -68,11 +68,11 @@ public class AllocatedPersistentTask extends CancellableTask { persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener); } - public long getPersistentTaskId() { + public String getPersistentTaskId() { return persistentTaskId; } - void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, long persistentTaskId, long + void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long allocationId) { this.persistentTasksService = persistentTasksService; this.logger = logger; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java index c1b91f5f029..b638b4b2735 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java @@ -29,6 +29,8 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Persiste import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; + /** * Action that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be * removed from the cluster state in case of successful completion or restarted on some other node in case of failure. @@ -56,7 +58,7 @@ public class CompletionPersistentTaskAction extends Action { - private long taskId; + private String taskId; private Exception exception; @@ -64,7 +66,7 @@ public class CompletionPersistentTaskAction extends Action { + private String taskId; + private String action; private PersistentTaskRequest request; @@ -65,7 +67,8 @@ public class CreatePersistentTaskAction extends Action listener) { - persistentTasksClusterService.createPersistentTask(request.action, request.request, + persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.request, new ActionListener>() { @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java index fe2200bd1ef..1f1e2825cd3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.persistent; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -46,15 +47,19 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param request request * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String action, Request request, + public void createPersistentTask(String taskId, String action, Request request, ActionListener> listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { + PersistentTasksCustomMetaData.Builder builder = builder(currentState); + if (builder.hasTask(taskId)) { + throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist"); + } validate(action, clusterService.state(), request); final Assignment assignment; assignment = getAssignement(action, currentState, request); - return update(currentState, builder(currentState).addTask(action, request, assignment)); + return update(currentState, builder.addTask(taskId, action, request, assignment)); } @Override @@ -67,7 +72,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (tasks != null) { - listener.onResponse(tasks.getTask(tasks.getCurrentId())); + listener.onResponse(tasks.getTask(taskId)); } else { listener.onResponse(null); } @@ -83,7 +88,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param failure the reason for restarting the task or null if the task completed successfully * @param listener the listener that will be called when task is removed */ - public void completePersistentTask(long id, Exception failure, ActionListener> listener) { + public void completePersistentTask(String id, Exception failure, ActionListener> listener) { final String source; if (failure != null) { logger.warn("persistent task " + id + " failed", failure); @@ -124,7 +129,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param id the id of a persistent task * @param listener the listener that will be called when task is removed */ - public void removePersistentTask(long id, ActionListener> listener) { + public void removePersistentTask(String id, ActionListener> listener) { clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -152,12 +157,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements /** * Update task status * - * @param id the id of a persistent task - * @param allocationId the expected allocation id of the persistent task - * @param status new status - * @param listener the listener that will be called when task is removed + * @param id the id of a persistent task + * @param allocationId the expected allocation id of the persistent task + * @param status new status + * @param listener the listener that will be called when task is removed */ - public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener> listener) { + public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener> listener) { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index 92d4a714652..2248db7ac3a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.ClusterState; @@ -34,7 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.BiFunction; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -50,12 +52,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable> tasks; + private final Map> tasks; - private final long currentId; + private final long lastAllocationId; - public PersistentTasksCustomMetaData(long currentId, Map> tasks) { - this.currentId = currentId; + public PersistentTasksCustomMetaData(long lastAllocationId, Map> tasks) { + this.lastAllocationId = lastAllocationId; this.tasks = tasks; } @@ -74,7 +76,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable> taskMap() { + public Map> taskMap() { return this.tasks; } - public PersistentTask getTask(long id) { + public PersistentTask getTask(String id) { return this.tasks.get(id); } @@ -137,13 +139,13 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable PersistentTask getTaskWithId(ClusterState clusterState, long taskId) { + public static PersistentTask getTaskWithId(ClusterState clusterState, String taskId) { PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); if (tasks != null) { - return (PersistentTask)tasks.getTask(taskId); + return (PersistentTask) tasks.getTask(taskId); } return null; } @@ -219,7 +221,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable implements Writeable, ToXContent { - private final long id; + private final String id; private final long allocationId; private final String taskName; private final Request request; @@ -240,12 +242,12 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable task, Assignment assignment) { - this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status, + public PersistentTask(PersistentTask task, long allocationId, Assignment assignment) { + this(task.id, allocationId, task.taskName, task.request, task.status, assignment, task.allocationId); } @@ -254,7 +256,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable that = (PersistentTask) o; - return id == that.id && + return Objects.equals(id, that.id) && allocationId == that.allocationId && Objects.equals(taskName, that.taskName) && Objects.equals(request, that.request) && @@ -315,7 +317,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable { - private long id; + private String id; private long allocationId; private String taskName; private Request request; @@ -416,7 +418,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable setId(long id) { + public TaskBuilder setId(String id) { this.id = id; return this; } @@ -464,30 +466,28 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable { - value.writeTo(stream); - }); + out.writeLong(lastAllocationId); + out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream)); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { return readDiffFrom(MetaData.Custom.class, TYPE, in); } - public long getCurrentId() { - return currentId; + public long getLastAllocationId() { + return lastAllocationId; } @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.field("current_id", currentId); + builder.field("last_allocation_id", lastAllocationId); builder.startArray("tasks"); for (PersistentTask entry : tasks.values()) { entry.toXContent(builder, params); @@ -505,8 +505,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable> tasks = new HashMap<>(); - private long currentId; + private final Map> tasks = new HashMap<>(); + private long lastAllocationId; private boolean changed; public Builder() { @@ -515,14 +515,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable - * After the task is added its id can be found by calling {{@link #getCurrentId()}} method. + * After the task is added its id can be found by calling {{@link #getLastAllocationId()}} method. */ - public Builder addTask(String taskName, Request request, Assignment assignment) { + public Builder addTask(String taskId, String taskName, Request request, + Assignment assignment) { changed = true; - currentId++; - tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment)); + PersistentTask previousTask = tasks.put(taskId, new PersistentTask<>(taskId, taskName, request, + getNextAllocationId(), assignment)); + if (previousTask != null) { + throw new ResourceAlreadyExistsException("Trying to override task with id {" + taskId + "}"); + } return this; } /** - * Reassigns the task to another node if the task exist + * Reassigns the task to another node */ - public Builder reassignTask(long taskId, Assignment assignment) { + public Builder reassignTask(String taskId, Assignment assignment) { PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, getNextAllocationId(), assignment)); + } else { + throw new ResourceNotFoundException("cannot reassign task with id {" + taskId + "}, the task no longer exits"); } return this; } /** - * Assigns the task to another node if the task exist and not currently assigned - *

- * The operation is only performed if the task is not currently assigned to any nodes. + * Updates the task status */ - @SuppressWarnings("unchecked") - public Builder assignTask(long taskId, - BiFunction executorNodeFunc) { - PersistentTask taskInProgress = (PersistentTask) tasks.get(taskId); - if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks - Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request); - if (assignment.isAssigned()) { - changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment)); - } - } - return this; - } - - - /** - * Updates the task status if the task exist - */ - public Builder updateTaskStatus(long taskId, Status status) { + public Builder updateTaskStatus(String taskId, Status status) { PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; tasks.put(taskId, new PersistentTask<>(taskInProgress, status)); + } else { + throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exits"); } return this; } /** - * Removes the task if the task exist + * Removes the task */ - public Builder removeTask(long taskId) { + public Builder removeTask(String taskId) { if (tasks.remove(taskId) != null) { changed = true; + } else { + throw new ResourceNotFoundException("cannot remove task with id {" + taskId + "}, the task no longer exits"); } return this; } /** - * Finishes the task if the task exist. - * + * Finishes the task + *

* If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped. */ - public Builder finishTask(long taskId) { + public Builder finishTask(String taskId) { PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; tasks.remove(taskId); + } else { + throw new ResourceNotFoundException("cannot finish task with id {" + taskId + "}, the task no longer exits"); } return this; } @@ -617,14 +614,14 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); if (taskInProgress != null) { return taskInProgress.getAllocationId() == allocationId; @@ -632,11 +629,8 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable getCurrentTaskIds() { + return tasks.keySet(); } /** @@ -647,7 +641,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable runningTasks = new HashMap<>(); + private final Map runningTasks = new HashMap<>(); private final PersistentTasksService persistentTasksService; private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry; private final TaskManager taskManager; @@ -84,24 +84,24 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) { // We have some changes let's check if they are related to our node String localNodeId = event.state().getNodes().getLocalNodeId(); - Set notVisitedTasks = new HashSet<>(runningTasks.keySet()); + Set notVisitedTasks = new HashSet<>(runningTasks.keySet()); if (tasks != null) { for (PersistentTask taskInProgress : tasks.tasks()) { if (localNodeId.equals(taskInProgress.getExecutorNode())) { - PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()); - AllocatedPersistentTask persistentTask = runningTasks.get(persistentTaskId); + Long allocationId = taskInProgress.getAllocationId(); + AllocatedPersistentTask persistentTask = runningTasks.get(allocationId); if (persistentTask == null) { // New task - let's start it startTask(taskInProgress); } else { // The task is still running - notVisitedTasks.remove(persistentTaskId); + notVisitedTasks.remove(allocationId); } } } } - for (PersistentTaskId id : notVisitedTasks) { + for (Long id : notVisitedTasks) { AllocatedPersistentTask task = runningTasks.get(id); if (task.getState() == AllocatedPersistentTask.State.COMPLETED) { // Result was sent to the caller and the caller acknowledged acceptance of the result @@ -126,7 +126,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu try { task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId()); try { - runningTasks.put(new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()), task); + runningTasks.put(taskInProgress.getAllocationId(), task); nodePersistentTasksExecutor.executeTask(taskInProgress.getRequest(), task, action); } catch (Exception e) { // Submit task failure @@ -145,52 +145,26 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu * Unregisters and then cancels the locally running task using the task manager. No notification to master will be send upon * cancellation. */ - private void cancelTask(PersistentTaskId persistentTaskId) { - AllocatedPersistentTask task = runningTasks.remove(persistentTaskId); - if (task != null) { - if (task.markAsCancelled()) { - // Cancel the local task using the task manager - persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener() { - @Override - public void onResponse(CancelTasksResponse cancelTasksResponse) { - logger.trace("Persistent task with id {} was cancelled", task.getId()); + private void cancelTask(Long allocationId) { + AllocatedPersistentTask task = runningTasks.remove(allocationId); + if (task.markAsCancelled()) { + // Cancel the local task using the task manager + persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener() { + @Override + public void onResponse(CancelTasksResponse cancelTasksResponse) { + logger.trace("Persistent task with id {} was cancelled", task.getId()); - } + } - @Override - public void onFailure(Exception e) { - // There is really nothing we can do in case of failure here - logger.warn((Supplier) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e); - } - }); - } + @Override + public void onFailure(Exception e) { + // There is really nothing we can do in case of failure here + logger.warn((Supplier) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e); + } + }); } } - private static class PersistentTaskId { - private final long id; - private final long allocationId; - - PersistentTaskId(long id, long allocationId) { - this.id = id; - this.allocationId = allocationId; - } - - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - PersistentTaskId that = (PersistentTaskId) o; - return id == that.id && - allocationId == that.allocationId; - } - - @Override - public int hashCode() { - return Objects.hash(id, allocationId); - } - } public static class Status implements Task.Status { public static final String NAME = "persistent_executor"; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index 7bff400b2ae..b2c9196b100 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -45,9 +45,10 @@ public class PersistentTasksService extends AbstractComponent { * Creates the specified persistent task and attempts to assign it to a node. */ @SuppressWarnings("unchecked") - public void startPersistentTask(String taskName, Request request, + public void startPersistentTask(String taskId, String taskName, Request request, ActionListener> listener) { - CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(taskName, request); + CreatePersistentTaskAction.Request createPersistentActionRequest = + new CreatePersistentTaskAction.Request(taskId, taskName, request); try { client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( o -> listener.onResponse((PersistentTask) o.getTask()), listener::onFailure)); @@ -59,7 +60,7 @@ public class PersistentTasksService extends AbstractComponent { /** * Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure */ - public void sendCompletionNotification(long taskId, Exception failure, ActionListener> listener) { + public void sendCompletionNotification(String taskId, Exception failure, ActionListener> listener) { CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure); try { client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, @@ -90,7 +91,7 @@ public class PersistentTasksService extends AbstractComponent { * Persistent task implementers shouldn't call this method directly and use * {@link AllocatedPersistentTask#updatePersistentStatus} instead */ - void updateStatus(long taskId, long allocationId, Task.Status status, ActionListener> listener) { + void updateStatus(String taskId, long allocationId, Task.Status status, ActionListener> listener) { UpdatePersistentTaskStatusAction.Request updateStatusRequest = new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status); try { @@ -104,7 +105,7 @@ public class PersistentTasksService extends AbstractComponent { /** * Cancels if needed and removes a persistent task */ - public void cancelPersistentTask(long taskId, ActionListener> listener) { + public void cancelPersistentTask(String taskId, ActionListener> listener) { RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId); try { client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()), @@ -118,7 +119,7 @@ public class PersistentTasksService extends AbstractComponent { * Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't * waits of it. */ - public void waitForPersistentTaskStatus(long taskId, Predicate> predicate, @Nullable TimeValue timeout, + public void waitForPersistentTaskStatus(String taskId, Predicate> predicate, @Nullable TimeValue timeout, WaitForPersistentTaskStatusListener listener) { ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java index 75cd3dc4f93..e8350db2268 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -24,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -54,30 +52,30 @@ public class RemovePersistentTaskAction extends Action { - private long taskId; + private String taskId; public Request() { } - public Request(long taskId) { + public Request(String taskId) { this.taskId = taskId; } - public void setTaskId(long taskId) { + public void setTaskId(String taskId) { this.taskId = taskId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - taskId = in.readLong(); + taskId = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeLong(taskId); + out.writeString(taskId); } @Override @@ -90,7 +88,7 @@ public class RemovePersistentTaskAction extends Action { - private long taskId; + private String taskId; private long allocationId; @@ -65,13 +65,13 @@ public class UpdatePersistentTaskStatusAction extends Action> tasks = new HashMap<>(); - tasks.put(0L, new PersistentTask(0L, OpenJobAction.NAME, - new OpenJobAction.Request("job_id"), new Assignment("node_id", ""))); + Map> tasks = new HashMap<>(); + tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, + new OpenJobAction.Request("job_id"), 0L, new Assignment("node_id", ""))); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0L, tasks)).build(); @@ -79,9 +79,9 @@ public class MlAssignmentNotifierTests extends ESTestCase { new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) .build(); - Map> tasks = new HashMap<>(); - tasks.put(0L, new PersistentTask(0L, OpenJobAction.NAME, - new OpenJobAction.Request("job_id"), new Assignment(null, "no nodes"))); + Map> tasks = new HashMap<>(); + tasks.put("0L", new PersistentTask("0L", OpenJobAction.NAME, + new OpenJobAction.Request("job_id"), 0L, new Assignment(null, "no nodes"))); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0L, tasks)).build(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index bc909ce0615..e269673616c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -150,10 +150,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getDatafeeds().get("1"), nullValue()); - PersistentTask task = createJobTask(0L, "1", null, JobState.CLOSED); + PersistentTask task = createJobTask("0L", "1", null, JobState.CLOSED, 0L); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task)))); + () -> builder2.deleteJob("1", new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task)))); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -273,7 +273,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); PersistentTask taskInProgress = - new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); + new PersistentTask<>("0", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); @@ -335,7 +335,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); PersistentTask taskInProgress = - new PersistentTask<>(0, StartDatafeedAction.NAME, request, INITIAL_ASSIGNMENT); + new PersistentTask<>("0", StartDatafeedAction.NAME, request, 0L, INITIAL_ASSIGNMENT); PersistentTasksCustomMetaData tasksInProgress = new PersistentTasksCustomMetaData(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java index 24e6bed6e91..f2462e376cd 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionRequestTests.java @@ -58,10 +58,10 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa mlBuilder.putJob(BaseMlIntegTestCase.createScheduledJob("job_id").build(new Date()), false); mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id", "job_id", Collections.singletonList("*"))); - Map> tasks = new HashMap<>(); - PersistentTask jobTask = createJobTask(1L, "job_id", null, JobState.OPENED); - tasks.put(1L, jobTask); - tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STARTED)); + Map> tasks = new HashMap<>(); + PersistentTask jobTask = createJobTask("1L", "job_id", null, JobState.OPENED, 1L); + tasks.put("1L", jobTask); + tasks.put("2L", createTask("2L", "datafeed_id", 0L, null, DatafeedState.STARTED, 2L)); ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, @@ -74,14 +74,14 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa assertEquals("cannot close job [job_id], datafeed hasn't been stopped", e.getMessage()); tasks = new HashMap<>(); - tasks.put(1L, jobTask); + tasks.put("1L", jobTask); if (randomBoolean()) { - tasks.put(2L, createTask(2L, "datafeed_id", 0L, null, DatafeedState.STOPPED)); + tasks.put("2L", createTask("2L", "datafeed_id", 0L, null, DatafeedState.STOPPED, 3L)); } ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) .putCustom(PersistentTasksCustomMetaData.TYPE, - new PersistentTasksCustomMetaData(1L, tasks))).build(); + new PersistentTasksCustomMetaData(3L, tasks))).build(); CloseJobAction.validateAndReturnJobTask("job_id", cs2); } @@ -102,15 +102,15 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa mlBuilder.putDatafeed(BaseMlIntegTestCase.createDatafeed("datafeed_id_3", "job_id_3", Collections.singletonList("*"))); - Map> tasks = new HashMap<>(); - PersistentTask jobTask = createJobTask(1L, "job_id_1", null, JobState.OPENED); - tasks.put(1L, jobTask); + Map> tasks = new HashMap<>(); + PersistentTask jobTask = createJobTask("1L", "job_id_1", null, JobState.OPENED, 1L); + tasks.put("1L", jobTask); - jobTask = createJobTask(2L, "job_id_2", null, JobState.CLOSED); - tasks.put(2L, jobTask); + jobTask = createJobTask("2L", "job_id_2", null, JobState.CLOSED, 2L); + tasks.put("2L", jobTask); - jobTask = createJobTask(3L, "job_id_3", null, JobState.FAILED); - tasks.put(3L, jobTask); + jobTask = createJobTask("3L", "job_id_3", null, JobState.FAILED, 3L); + tasks.put("3L", jobTask); ClusterState cs1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) @@ -122,14 +122,16 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa CloseJobAction.resolveAndValidateJobId("_all", cs1)); } - public static PersistentTask createTask(long id, + public static PersistentTask createTask(String id, String datafeedId, long startTime, String nodeId, - DatafeedState state) { + DatafeedState state, + long allocationId) { PersistentTask task = new PersistentTask<>(id, StartDatafeedAction.NAME, new StartDatafeedAction.Request(datafeedId, startTime), + allocationId, new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); task = new PersistentTask<>(task, state); return task; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 42d0f7d5b02..ccbfe2d7e5e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.Index; @@ -55,8 +56,8 @@ public class OpenJobActionTests extends ESTestCase { mlBuilder.putJob(buildJobBuilder("job_id").build(), false); PersistentTask task = - createJobTask(1L, "job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED)); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); + createJobTask("1L", "job_id2", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED), 0L); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks); OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksCustomMetaData(1L, Collections.emptyMap())); @@ -83,8 +84,8 @@ public class OpenJobActionTests extends ESTestCase { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTask task = createJobTask(1L, "job_id", "_node_id", JobState.OPENED); - PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); + PersistentTask task = createJobTask("1L", "job_id", "_node_id", JobState.OPENED, 0L); + PersistentTasksCustomMetaData tasks1 = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); Exception e = expectThrows(ElasticsearchStatusException.class, () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1)); @@ -103,10 +104,13 @@ public class OpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - Map> taskMap = new HashMap<>(); - taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), new Assignment("_node_id1", "test assignment"))); - taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), new Assignment("_node_id1", "test assignment"))); - taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), new Assignment("_node_id2", "test assignment"))); + Map> taskMap = new HashMap<>(); + taskMap.put("0L", new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 0L, + new Assignment("_node_id1", "test assignment"))); + taskMap.put("1L", new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), 1L, + new Assignment("_node_id1", "test assignment"))); + taskMap.put("2L", new PersistentTask<>("2L", OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), 2L, + new Assignment("_node_id2", "test assignment"))); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); @@ -128,17 +132,19 @@ public class OpenJobActionTests extends ESTestCase { Map nodeAttr = new HashMap<>(); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); - Map> taskMap = new HashMap<>(); + Map> taskMap = new HashMap<>(); + long allocationId = 0; for (int i = 0; i < numNodes; i++) { String nodeId = "_node_id" + i; TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i); nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT)); for (int j = 0; j < maxRunningJobsPerNode; j++) { long id = j + (maxRunningJobsPerNode * i); - taskMap.put(id, createJobTask(id, "job_id" + id, nodeId, JobState.OPENED)); + String taskId = UUIDs.base64UUID(); + taskMap.put(taskId, createJobTask(taskId, "job_id" + id, nodeId, JobState.OPENED, allocationId++)); } } - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(numNodes * maxRunningJobsPerNode, taskMap); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(allocationId, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); @@ -163,9 +169,9 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTask task = - new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), + new PersistentTask<>("1L", OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), 1L, new Assignment("_node_id1", "test assignment")); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); @@ -192,12 +198,12 @@ public class OpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - Map> taskMap = new HashMap<>(); - taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", null)); - taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", null)); - taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", null)); - taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", null)); - taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", null)); + Map> taskMap = new HashMap<>(); + taskMap.put("0L", createJobTask("0L", "job_id1", "_node_id1", null, 0L)); + taskMap.put("1L", createJobTask("1L", "job_id2", "_node_id1", null, 1L)); + taskMap.put("2L", createJobTask("2L", "job_id3", "_node_id2", null, 2L)); + taskMap.put("3L", createJobTask("3L", "job_id4", "_node_id2", null, 3L)); + taskMap.put("4L", createJobTask("4L", "job_id5", "_node_id3", null, 4L)); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(5L, taskMap); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); @@ -213,8 +219,8 @@ public class OpenJobActionTests extends ESTestCase { Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); assertEquals("_node_id3", result.getExecutorNode()); - PersistentTask lastTask = createJobTask(5L, "job_id6", "_node_id3", null); - taskMap.put(5L, lastTask); + PersistentTask lastTask = createJobTask("5L", "job_id6", "_node_id3", null, 6L); + taskMap.put("5L", lastTask); tasks = new PersistentTasksCustomMetaData(6L, taskMap); csBuilder = ClusterState.builder(cs); @@ -224,8 +230,8 @@ public class OpenJobActionTests extends ESTestCase { assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put(5L, new PersistentTask<>(lastTask, new Assignment("_node_id3", "test assignment"))); - tasks = new PersistentTasksCustomMetaData(6L, taskMap); + taskMap.put("5L", new PersistentTask<>(lastTask, 7L, new Assignment("_node_id3", "test assignment"))); + tasks = new PersistentTasksCustomMetaData(7L, taskMap); csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); @@ -234,8 +240,8 @@ public class OpenJobActionTests extends ESTestCase { assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put(5L, new PersistentTask<>(lastTask, (Task.Status) null)); - tasks = new PersistentTasksCustomMetaData(6L, taskMap); + taskMap.put("5L", new PersistentTask<>(lastTask, (Task.Status) null)); + tasks = new PersistentTasksCustomMetaData(8L, taskMap); csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); @@ -280,11 +286,13 @@ public class OpenJobActionTests extends ESTestCase { assertEquals(indexToRemove, result.get(0)); } - public static PersistentTask createJobTask(long id, String jobId, String nodeId, JobState jobState) { + public static PersistentTask createJobTask(String id, String jobId, String nodeId, JobState jobState, + long allocationId) { PersistentTask task = - new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), new Assignment(nodeId, "test assignment")); + new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), allocationId, + new Assignment(nodeId, "test assignment")); if (jobState != null) { - task = new PersistentTask<>(task, new JobTaskStatus(jobState, 0L)); + task = new PersistentTask<>(task, new JobTaskStatus(jobState, allocationId)); } return task; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index f7a12765e51..4efeb8c65a5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -70,8 +70,8 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED); - PersistentTask task = createJobTask(0L, job.getId(), "node_id", jobState); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask("0L", job.getId(), "node_id", jobState, 0L); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task)); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), @@ -91,8 +91,8 @@ public class StartDatafeedActionTests extends ESTestCase { assertEquals("cannot start datafeed [datafeed_id], because job's [job_id] state is [" + jobState + "] while state [opened] is required", result.getExplanation()); - task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); - tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 1L); + tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) @@ -124,8 +124,8 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); List> states = new ArrayList<>(2); states.add(new Tuple<>(0, ShardRoutingState.UNASSIGNED)); @@ -164,8 +164,8 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); List> states = new ArrayList<>(2); states.add(new Tuple<>(0, ShardRoutingState.STARTED)); @@ -203,8 +203,8 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() @@ -234,8 +234,8 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("foo"))); String nodeId = randomBoolean() ? "node_id2" : null; - PersistentTask task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask("0L", job.getId(), nodeId, JobState.OPENED, 0L); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), @@ -255,8 +255,8 @@ public class StartDatafeedActionTests extends ESTestCase { assertEquals("cannot start datafeed [datafeed_id], job [job_id] is unassigned or unassigned to a non existing node", result.getExplanation()); - task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED); - tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + task = createJobTask("0L", job.getId(), "node_id1", JobState.OPENED, 0L); + tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder() .putCustom(MlMetadata.TYPE, mlMetadata.build()) @@ -284,8 +284,8 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .build(); PersistentTask task = - new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), INITIAL_ASSIGNMENT); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap(0L, task)); + new PersistentTask<>("0L", OpenJobAction.NAME, new OpenJobAction.Request("job_id"), 0L, INITIAL_ASSIGNMENT); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(0L, Collections.singletonMap("0L", task)); DatafeedConfig datafeedConfig1 = DatafeedManagerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) .putDatafeed(datafeedConfig1) @@ -303,14 +303,14 @@ public class StartDatafeedActionTests extends ESTestCase { .putDatafeed(datafeedConfig) .build(); - PersistentTask jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); + PersistentTask jobTask = createJobTask("0L", "job_id", "node_id", JobState.OPENED, 0L); PersistentTask datafeedTask = - new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - new Assignment("node_id", "test assignment")); + new PersistentTask<>("1L", StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), + 1L, new Assignment("node_id", "test assignment")); datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); - Map> taskMap = new HashMap<>(); - taskMap.put(0L, jobTask); - taskMap.put(1L, datafeedTask); + Map> taskMap = new HashMap<>(); + taskMap.put("0L", jobTask); + taskMap.put("1L", datafeedTask); PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(2L, taskMap); Exception e = expectThrows(ElasticsearchStatusException.class, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index 089c9d15c8f..f428168897c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -54,10 +54,10 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe } public void testValidate() { - PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + PersistentTask task = new PersistentTask("1L", StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STARTED); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); Job job = createDatafeedJob().build(new Date()); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); @@ -75,9 +75,9 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe public void testValidate_alreadyStopped() { PersistentTasksCustomMetaData tasks; if (randomBoolean()) { - PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); - tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(1L, task)); + PersistentTask task = new PersistentTask("1L", StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo2", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); + tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("1L", task)); } else { tasks = randomBoolean() ? null : new PersistentTasksCustomMetaData(0L, Collections.emptyMap()); } @@ -94,34 +94,34 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe } public void testResolveAll() { - Map> taskMap = new HashMap<>(); + Map> taskMap = new HashMap<>(); Builder mlMetadataBuilder = new MlMetadata.Builder(); - PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("datafeed_1", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + PersistentTask task = new PersistentTask("1L", StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_1", 0L), 1L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STARTED); - taskMap.put(1L, task); + taskMap.put("1L", task); Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - task = new PersistentTask(2L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("datafeed_2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask("2L", StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_2", 0L), 2L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STOPPED); - taskMap.put(2L, task); + taskMap.put("2L", task); job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - task = new PersistentTask(3L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("datafeed_3", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask("3L", StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_3", 0L), 3L, new PersistentTasksCustomMetaData.Assignment("node_id", "")); task = new PersistentTask<>(task, DatafeedState.STARTED); - taskMap.put(3L, task); + taskMap.put("3L", task); job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, taskMap); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(3L, taskMap); MlMetadata mlMetadata = mlMetadataBuilder.build(); assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 3da00ef5a16..74cbd4a50bf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -98,8 +98,8 @@ public class DatafeedManagerTests extends ESTestCase { Job job = createDatafeedJob().build(new Date()); mlMetadata.putJob(job, false); mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build()); - PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); - PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask("0L", job.getId(), "node_id", JobState.OPENED, 0L); + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, Collections.singletonMap("0L", task)); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/CancelPersistentTaskRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/CancelPersistentTaskRequestTests.java index 8d56c455af7..3cd4c67bbf3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/CancelPersistentTaskRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/CancelPersistentTaskRequestTests.java @@ -5,14 +5,16 @@ */ package org.elasticsearch.xpack.persistent; -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Request; import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Request; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; public class CancelPersistentTaskRequestTests extends AbstractStreamableTestCase { @Override protected Request createTestInstance() { - return new Request(randomLong()); + return new Request(randomAsciiOfLength(10)); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java index 0305613ba33..fe634319eec 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -389,11 +390,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { MetaData.Builder metaData, PersistentTasksCustomMetaData.Builder tasks, Assignment assignment, String param) { return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksCustomMetaData.TYPE, - tasks.addTask(randomAlphaOfLength(10), new TestRequest(param), assignment).build())); + tasks.addTask(UUIDs.base64UUID(), randomAlphaOfLength(10), new TestRequest(param), assignment).build())); } private void addTask(PersistentTasksCustomMetaData.Builder tasks, String action, String param, String node) { - tasks.addTask(action, new TestRequest(param), new Assignment(node, "explanation: " + action)); + tasks.addTask(UUIDs.base64UUID(), action, new TestRequest(param), new Assignment(node, "explanation: " + action)); } private DiscoveryNode newNode(String nodeId) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java index ada1842f1c4..f6c0e594e56 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaDataTests.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData.Custom; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.Writeable; @@ -44,11 +46,12 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ int numberOfTasks = randomInt(10); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); for (int i = 0; i < numberOfTasks; i++) { - tasks.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), + String taskId = UUIDs.base64UUID(); + tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment()); if (randomBoolean()) { // From time to time update status - tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAlphaOfLength(10))); + tasks.updateTaskStatus(taskId, new Status(randomAlphaOfLength(10))); } } return tasks.build(); @@ -71,31 +74,30 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ @Override protected Custom makeTestChanges(Custom testInstance) { - PersistentTasksCustomMetaData tasksInProgress = (PersistentTasksCustomMetaData) testInstance; - Builder builder = new Builder(); + Builder builder = new Builder((PersistentTasksCustomMetaData) testInstance); switch (randomInt(3)) { case 0: addRandomTask(builder); break; case 1: - if (tasksInProgress.tasks().isEmpty()) { + if (builder.getCurrentTaskIds().isEmpty()) { addRandomTask(builder); } else { - builder.reassignTask(pickRandomTask(tasksInProgress), randomAssignment()); + builder.reassignTask(pickRandomTask(builder), randomAssignment()); } break; case 2: - if (tasksInProgress.tasks().isEmpty()) { + if (builder.getCurrentTaskIds().isEmpty()) { addRandomTask(builder); } else { - builder.updateTaskStatus(pickRandomTask(tasksInProgress), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); + builder.updateTaskStatus(pickRandomTask(builder), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); } break; case 3: - if (tasksInProgress.tasks().isEmpty()) { + if (builder.getCurrentTaskIds().isEmpty()) { addRandomTask(builder); } else { - builder.removeTask(pickRandomTask(tasksInProgress)); + builder.removeTask(pickRandomTask(builder)); } break; } @@ -134,13 +136,14 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ return builder; } - private Builder addRandomTask(Builder builder) { - builder.addTask(TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment()); - return builder; + private String addRandomTask(Builder builder) { + String taskId = UUIDs.base64UUID(); + builder.addTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest(randomAlphaOfLength(10)), randomAssignment()); + return taskId; } - private long pickRandomTask(PersistentTasksCustomMetaData testInstance) { - return randomFrom(new ArrayList<>(testInstance.tasks())).getId(); + private String pickRandomTask(PersistentTasksCustomMetaData.Builder testInstance) { + return randomFrom(new ArrayList<>(testInstance.getCurrentTaskIds())); } @Override @@ -189,7 +192,7 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ public void testBuilder() { PersistentTasksCustomMetaData persistentTasks = null; - long lastKnownTask = -1; + String lastKnownTask = ""; for (int i = 0; i < randomIntBetween(10, 100); i++) { final Builder builder; if (randomBoolean()) { @@ -199,54 +202,46 @@ public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializ } boolean changed = false; for (int j = 0; j < randomIntBetween(1, 10); j++) { - switch (randomInt(5)) { + switch (randomInt(4)) { case 0: - lastKnownTask = addRandomTask(builder).getCurrentId(); + lastKnownTask = addRandomTask(builder); changed = true; break; case 1: if (builder.hasTask(lastKnownTask)) { changed = true; + builder.reassignTask(lastKnownTask, randomAssignment()); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.reassignTask(fLastKnownTask, randomAssignment())); } - builder.reassignTask(lastKnownTask, randomAssignment()); break; case 2: if (builder.hasTask(lastKnownTask)) { - PersistentTask task = builder.build().getTask(lastKnownTask); - if (randomBoolean()) { - // Trying to reassign to the same node - builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment()); - } else { - // Trying to reassign to a different node - Assignment randomAssignment = randomAssignment(); - builder.assignTask(lastKnownTask, (s, request) -> randomAssignment); - // should change if the task was unassigned and was reassigned to a different node or started - if ((task.isAssigned() == false && randomAssignment.isAssigned())) { - changed = true; - } - } + changed = true; + builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); } else { - // task doesn't exist - shouldn't change - builder.assignTask(lastKnownTask, (s, request) -> randomAssignment()); + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskStatus(fLastKnownTask, null)); } break; case 3: if (builder.hasTask(lastKnownTask)) { changed = true; + builder.removeTask(lastKnownTask); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.removeTask(fLastKnownTask)); } - builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); break; case 4: if (builder.hasTask(lastKnownTask)) { changed = true; + builder.finishTask(lastKnownTask); + } else { + String fLastKnownTask = lastKnownTask; + expectThrows(ResourceNotFoundException.class, () -> builder.finishTask(fLastKnownTask)); } - builder.removeTask(lastKnownTask); - break; - case 5: - if (builder.hasTask(lastKnownTask)) { - changed = true; - } - builder.finishTask(lastKnownTask); break; } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java index 14aca8af442..6b0d574d387 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -44,17 +45,18 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { public void testFullClusterRestart() throws Exception { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); int numberOfTasks = randomIntBetween(1, 10); - long[] taskIds = new long[numberOfTasks]; + String[] taskIds = new String[numberOfTasks]; List>> futures = new ArrayList<>(numberOfTasks); for (int i = 0; i < numberOfTasks; i++) { PlainActionFuture> future = new PlainActionFuture<>(); futures.add(future); - service.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + taskIds[i] = UUIDs.base64UUID(); + service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); } for (int i = 0; i < numberOfTasks; i++) { - taskIds[i] = futures.get(i).get().getId(); + assertThat(futures.get(i).get().getId(), equalTo(taskIds[i])); } PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java index 3982f525b44..91355980309 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; @@ -61,8 +63,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testPersistentActionFailure() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - long taskId = future.get().getId(); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + long allocationId = future.get().getAllocationId(); assertBusy(() -> { // Wait for the task to start assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() @@ -72,7 +74,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { .get().getTasks().get(0); logger.info("Found running task with id {} and parent {}", firstRunningTask.getId(), firstRunningTask.getParentTaskId()); // Verifying parent - assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId)); + assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId)); assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster")); logger.info("Failing the running task"); @@ -91,8 +93,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testPersistentActionCompletion() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - long taskId = future.get().getId(); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + long taskId = future.get().getAllocationId(); assertBusy(() -> { // Wait for the task to start assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() @@ -112,8 +114,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { PlainActionFuture> future = new PlainActionFuture<>(); TestRequest testRequest = new TestRequest("Blah"); testRequest.setExecutorNodeAttr("test"); - persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, testRequest, future); - long taskId = future.get().getId(); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testRequest, future); + String taskId = future.get().getId(); Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); String newNode = internalCluster().startNode(nodeSettings); @@ -146,8 +148,8 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - long taskId = future.get().getId(); + persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + String taskId = future.get().getId(); assertBusy(() -> { // Wait for the task to start @@ -201,6 +203,38 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertThat(future2.get(), nullValue()); } + public void testCreatePersistentTaskWithDuplicateId() throws Exception { + PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); + PlainActionFuture> future = new PlainActionFuture<>(); + String taskId = UUIDs.base64UUID(); + persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + future.get(); + + PlainActionFuture> future2 = new PlainActionFuture<>(); + persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future2); + assertThrows(future2, ResourceAlreadyExistsException.class); + + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() + .getTasks().size(), equalTo(1)); + }); + + TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get().getTasks().get(0); + + logger.info("Completing the running task"); + // Fail the running task and make sure it restarts properly + assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) + .get().getTasks().size(), equalTo(1)); + + logger.info("Waiting for persistent task with id {} to disappear", firstRunningTask.getId()); + assertBusy(() -> { + // Wait for the task to disappear completely + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(), + empty()); + }); + } private void stopOrCancelTask(TaskId taskId) { if (randomBoolean()) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java index c8761e9a618..90e5eaaa784 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.test.AbstractStreamableTestCase; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -19,9 +20,9 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest protected PersistentTaskResponse createTestInstance() { if (randomBoolean()) { return new PersistentTaskResponse( - new PersistentTask(randomLong(), randomAsciiOfLength(10), + new PersistentTask(UUIDs.base64UUID(), randomAsciiOfLength(10), new TestPersistentTasksPlugin.TestRequest("test"), - PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); + randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); } else { return new PersistentTaskResponse(null); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index 4081cd750db..41b490e2b19 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -76,11 +76,11 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.addTask("test_action", new TestRequest("other_" + i), + tasks.addTask(UUIDs.base64UUID(), "test_action", new TestRequest("other_" + i), new Assignment("other_node_" + randomInt(nonLocalNodesCount), "test assignment on other node")); if (added == false && randomBoolean()) { added = true; - tasks.addTask("test", new TestRequest("this_param"), + tasks.addTask(UUIDs.base64UUID(), "test", new TestRequest("this_param"), new Assignment("this_node", "test assignment on this node")); } } @@ -118,8 +118,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Finish both tasks executor.get(0).task.markAsFailed(new RuntimeException()); executor.get(1).task.markAsCompleted(); - long failedTaskId = executor.get(0).task.getParentTaskId().getId(); - long finishedTaskId = executor.get(1).task.getParentTaskId().getId(); + String failedTaskId = executor.get(0).task.getPersistentTaskId(); + String finishedTaskId = executor.get(1).task.getPersistentTaskId(); executor.clear(); // Add task on some other node @@ -158,7 +158,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } @Override - public void sendCompletionNotification(long taskId, Exception failure, ActionListener> listener) { + public void sendCompletionNotification(String taskId, Exception failure, ActionListener> listener) { fail("Shouldn't be called during Cluster State cancellation"); } }; @@ -184,8 +184,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // Check the the task is know to the task manager assertThat(taskManager.getTasks().size(), equalTo(1)); - Task runningTask = taskManager.getTasks().values().iterator().next(); - long persistentId = runningTask.getParentTaskId().getId(); + AllocatedPersistentTask runningTask = (AllocatedPersistentTask)taskManager.getTasks().values().iterator().next(); + String persistentId = runningTask.getPersistentTaskId(); long localId = runningTask.getId(); // Make sure it returns correct status Task.Status status = runningTask.getStatus(); @@ -227,10 +227,10 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, - builder.addTask(action, request, new Assignment(node, "test assignment")).build())).build(); + builder.addTask(UUIDs.base64UUID(), action, request, new Assignment(node, "test assignment")).build())).build(); } - private ClusterState reallocateTask(ClusterState state, long taskId, String node) { + private ClusterState reallocateTask(ClusterState state, String taskId, String node) { PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); assertTrue(builder.hasTask(taskId)); @@ -238,7 +238,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build(); } - private ClusterState removeTask(ClusterState state, long taskId) { + private ClusterState removeTask(ClusterState state, String taskId) { PersistentTasksCustomMetaData.Builder builder = PersistentTasksCustomMetaData.builder(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); assertTrue(builder.hasTask(taskId)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/RestartPersistentTaskRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/RestartPersistentTaskRequestTests.java index 8e992dc0087..2768643149d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/RestartPersistentTaskRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/RestartPersistentTaskRequestTests.java @@ -5,14 +5,14 @@ */ package org.elasticsearch.xpack.persistent; -import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Request; import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Request; public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCase { @Override protected Request createTestInstance() { - return new Request(randomLong(), null); + return new Request(randomAlphaOfLength(10), null); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java index 037773dc2af..436ef2160b1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction.Request; @@ -28,7 +29,7 @@ public class StartPersistentActionRequestTests extends AbstractStreamableTestCas if (randomBoolean()) { testRequest.setExecutorNodeAttr(randomAlphaOfLengthBetween(1, 20)); } - return new Request(randomAlphaOfLengthBetween(1, 20), new TestRequest()); + return new Request(UUIDs.base64UUID(), randomAlphaOfLengthBetween(1, 20), new TestRequest()); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java index 86b19c28602..f5c3a38886f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractStreamableTestCase; @@ -17,7 +18,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase @Override protected Request createTestInstance() { - return new Request(randomLong(), randomLong(), new Status(randomAlphaOfLength(10))); + return new Request(UUIDs.base64UUID(), randomLong(), new Status(randomAlphaOfLength(10))); } @Override