From 377c1ec2b42ee99540a57a8b7d168f084a72aff2 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 27 Feb 2017 11:37:42 -0700 Subject: [PATCH] Simplify names of PersistentTasks-related classes PersistentTask -> NodePersistentTask PersistentTasksInProgress -> PersistentTasks PersistentTaskInProgress -> PersistentTask Original commit: elastic/x-pack-elasticsearch@0947dbca56e8d95a4e636cb8574668450ace90e6 --- .../xpack/ml/MachineLearning.java | 10 +- .../elasticsearch/xpack/ml/MlMetadata.java | 46 ++++--- .../xpack/ml/action/CloseJobAction.java | 26 ++-- .../xpack/ml/action/DeleteDatafeedAction.java | 8 +- .../ml/action/GetDatafeedsStatsAction.java | 8 +- .../xpack/ml/action/GetJobsStatsAction.java | 10 +- .../xpack/ml/action/OpenJobAction.java | 26 ++-- .../xpack/ml/action/StartDatafeedAction.java | 25 ++-- .../xpack/ml/action/StopDatafeedAction.java | 9 +- .../ml/action/TransportJobTaskAction.java | 8 +- .../xpack/ml/action/UpdateDatafeedAction.java | 8 +- .../xpack/ml/action/UpdateJobAction.java | 4 +- .../xpack/ml/datafeed/DatafeedJobRunner.java | 2 +- .../xpack/ml/job/JobManager.java | 8 +- .../xpack/ml/utils/DatafeedStateObserver.java | 4 +- .../xpack/ml/utils/JobStateObserver.java | 4 +- ...stentTask.java => NodePersistentTask.java} | 6 +- .../PersistentActionCoordinator.java | 20 +-- .../persistent/PersistentActionExecutor.java | 2 +- .../persistent/PersistentActionRequest.java | 2 +- .../PersistentTaskClusterService.java | 35 +++--- ...ksInProgress.java => PersistentTasks.java} | 116 +++++++++--------- .../persistent/TransportPersistentAction.java | 12 +- .../xpack/persistent/package-info.java | 6 +- .../xpack/ml/MlMetadataTests.java | 34 ++--- .../xpack/ml/action/CloseJobActionTests.java | 18 +-- .../xpack/ml/action/OpenJobActionTests.java | 78 ++++++------ .../ml/action/StartDatafeedActionTests.java | 62 +++++----- .../StopDatafeedActionRequestTests.java | 24 ++-- .../ml/datafeed/DatafeedJobRunnerTests.java | 10 +- .../integration/BasicDistributedJobsIT.java | 30 ++--- .../integration/MlDistributedFailureIT.java | 8 +- .../xpack/ml/integration/TooManyJobsIT.java | 6 +- .../PersistentActionCoordinatorTests.java | 32 ++--- .../PersistentActionFullRestartIT.java | 18 +-- .../xpack/persistent/PersistentActionIT.java | 20 +-- .../PersistentTaskClusterServiceTests.java | 72 +++++------ ...ssTests.java => PersistentTasksTests.java} | 46 +++---- .../TestPersistentActionPlugin.java | 14 +-- 39 files changed, 436 insertions(+), 441 deletions(-) rename plugin/src/main/java/org/elasticsearch/xpack/persistent/{PersistentTask.java => NodePersistentTask.java} (87%) rename plugin/src/main/java/org/elasticsearch/xpack/persistent/{PersistentTasksInProgress.java => PersistentTasks.java} (80%) rename plugin/src/test/java/org/elasticsearch/xpack/persistent/{PersistentTasksInProgressTests.java => PersistentTasksTests.java} (85%) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index aedc3142989..9025b6beb19 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -127,7 +127,7 @@ import org.elasticsearch.xpack.persistent.PersistentActionRegistry; import org.elasticsearch.xpack.persistent.PersistentActionRequest; import org.elasticsearch.xpack.persistent.PersistentActionService; import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; @@ -212,8 +212,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { // Custom metadata new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new), new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new), - new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), - new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new), + new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom), // Persistent action requests new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new), @@ -233,8 +233,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { // Custom metadata new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"), parser -> MlMetadata.ML_METADATA_PARSER.parse(parser, null).build()), - new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE), - PersistentTasksInProgress::fromXContent), + new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasks.TYPE), + PersistentTasks::fromXContent), // Persistent action requests new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(StartDatafeedAction.NAME), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java index ca6182df4ea..67ab188aad7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlMetadata.java @@ -32,8 +32,8 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.io.IOException; import java.util.Collection; @@ -236,7 +236,7 @@ public class MlMetadata implements MetaData.Custom { return this; } - public Builder deleteJob(String jobId, PersistentTasksInProgress tasks) { + public Builder deleteJob(String jobId, PersistentTasks tasks) { Optional datafeed = getDatafeedByJobId(jobId); if (datafeed.isPresent()) { throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] while datafeed [" @@ -281,14 +281,14 @@ public class MlMetadata implements MetaData.Custom { } } - public Builder updateDatafeed(DatafeedUpdate update, PersistentTasksInProgress persistentTasksInProgress) { + public Builder updateDatafeed(DatafeedUpdate update, PersistentTasks persistentTasks) { String datafeedId = update.getId(); DatafeedConfig oldDatafeedConfig = datafeeds.get(datafeedId); if (oldDatafeedConfig == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); } checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE, datafeedId, - DatafeedState.STARTED), datafeedId, persistentTasksInProgress); + DatafeedState.STARTED), datafeedId, persistentTasks); DatafeedConfig newDatafeedConfig = update.apply(oldDatafeedConfig); if (newDatafeedConfig.getJobId().equals(oldDatafeedConfig.getJobId()) == false) { checkJobIsAvailableForDatafeed(newDatafeedConfig.getJobId()); @@ -299,13 +299,13 @@ public class MlMetadata implements MetaData.Custom { return this; } - public Builder removeDatafeed(String datafeedId, PersistentTasksInProgress persistentTasksInProgress) { + public Builder removeDatafeed(String datafeedId, PersistentTasks persistentTasks) { DatafeedConfig datafeed = datafeeds.get(datafeedId); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); } checkDatafeedIsStopped(() -> Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, datafeedId, - DatafeedState.STARTED), datafeedId, persistentTasksInProgress); + DatafeedState.STARTED), datafeedId, persistentTasks); datafeeds.remove(datafeedId); return this; } @@ -314,13 +314,13 @@ public class MlMetadata implements MetaData.Custom { return datafeeds.values().stream().filter(s -> s.getJobId().equals(jobId)).findFirst(); } - private void checkDatafeedIsStopped(Supplier msg, String datafeedId, PersistentTasksInProgress persistentTasksInProgress) { - if (persistentTasksInProgress != null) { - Predicate> predicate = t -> { + private void checkDatafeedIsStopped(Supplier msg, String datafeedId, PersistentTasks persistentTasks) { + if (persistentTasks != null) { + Predicate> predicate = t -> { StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest(); return storedRequest.getDatafeedId().equals(datafeedId); }; - if (persistentTasksInProgress.tasksExist(StartDatafeedAction.NAME, predicate)) { + if (persistentTasks.tasksExist(StartDatafeedAction.NAME, predicate)) { throw ExceptionsHelper.conflictStatusException(msg.get()); } } @@ -344,7 +344,7 @@ public class MlMetadata implements MetaData.Custom { return new MlMetadata(jobs, datafeeds); } - public void markJobAsDeleted(String jobId, PersistentTasksInProgress tasks) { + public void markJobAsDeleted(String jobId, PersistentTasks tasks) { Job job = jobs.get(jobId); if (job == null) { throw ExceptionsHelper.missingJobException(jobId); @@ -371,14 +371,13 @@ public class MlMetadata implements MetaData.Custom { } @Nullable - public static PersistentTasksInProgress.PersistentTaskInProgress getJobTask(String jobId, - @Nullable PersistentTasksInProgress tasks) { + public static PersistentTask getJobTask(String jobId, @Nullable PersistentTasks tasks) { if (tasks != null) { - Predicate> p = t -> { + Predicate> p = t -> { OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest(); return storedRequest.getJobId().equals(jobId); }; - for (PersistentTasksInProgress.PersistentTaskInProgress task : tasks.findTasks(OpenJobAction.NAME, p)) { + for (PersistentTask task : tasks.findTasks(OpenJobAction.NAME, p)) { return task; } } @@ -386,22 +385,21 @@ public class MlMetadata implements MetaData.Custom { } @Nullable - public static PersistentTasksInProgress.PersistentTaskInProgress getDatafeedTask(String datafeedId, - @Nullable PersistentTasksInProgress tasks) { + public static PersistentTask getDatafeedTask(String datafeedId, @Nullable PersistentTasks tasks) { if (tasks != null) { - Predicate> p = t -> { + Predicate> p = t -> { StartDatafeedAction.Request storedRequest = (StartDatafeedAction.Request) t.getRequest(); return storedRequest.getDatafeedId().equals(datafeedId); }; - for (PersistentTasksInProgress.PersistentTaskInProgress task : tasks.findTasks(StartDatafeedAction.NAME, p)) { + for (PersistentTask task : tasks.findTasks(StartDatafeedAction.NAME, p)) { return task; } } return null; } - public static JobState getJobState(String jobId, @Nullable PersistentTasksInProgress tasks) { - PersistentTasksInProgress.PersistentTaskInProgress task = getJobTask(jobId, tasks); + public static JobState getJobState(String jobId, @Nullable PersistentTasks tasks) { + PersistentTask task = getJobTask(jobId, tasks); if (task != null && task.getStatus() != null) { JobState jobTaskState = (JobState) task.getStatus(); if (jobTaskState != null) { @@ -412,8 +410,8 @@ public class MlMetadata implements MetaData.Custom { return JobState.CLOSED; } - public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksInProgress tasks) { - PersistentTasksInProgress.PersistentTaskInProgress task = getDatafeedTask(datafeedId, tasks); + public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasks tasks) { + PersistentTask task = getDatafeedTask(datafeedId, tasks); if (task != null && task.getStatus() != null) { return (DatafeedState) task.getStatus(); } else { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 4c44766e62b..4714de069be 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -41,8 +41,8 @@ import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.io.IOException; import java.util.Date; @@ -278,19 +278,19 @@ public class CloseJobAction extends Action validateAndFindTask(String jobId, ClusterState state) { + static PersistentTask validateAndFindTask(String jobId, ClusterState state) { MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); if (mlMetadata.getJobs().containsKey(jobId) == false) { throw ExceptionsHelper.missingJobException(jobId); } - PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE); if (tasks != null) { - Predicate> p = t -> { + Predicate> p = t -> { OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest(); return storedRequest.getJobId().equals(jobId); }; - for (PersistentTaskInProgress task : tasks.findTasks(OpenJobAction.NAME, p)) { + for (PersistentTask task : tasks.findTasks(OpenJobAction.NAME, p)) { OpenJobAction.Request storedRequest = (OpenJobAction.Request) task.getRequest(); if (storedRequest.getJobId().equals(jobId)) { JobState jobState = (JobState) task.getStatus(); @@ -307,13 +307,13 @@ public class CloseJobAction extends Action task = validateAndFindTask(jobId, currentState); - PersistentTasksInProgress currentTasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE); - Map> updatedTasks = new HashMap<>(currentTasks.taskMap()); - PersistentTaskInProgress taskToUpdate = currentTasks.getTask(task.getId()); - taskToUpdate = new PersistentTaskInProgress<>(taskToUpdate, JobState.CLOSING); + PersistentTask task = validateAndFindTask(jobId, currentState); + PersistentTasks currentTasks = currentState.getMetaData().custom(PersistentTasks.TYPE); + Map> updatedTasks = new HashMap<>(currentTasks.taskMap()); + PersistentTask taskToUpdate = currentTasks.getTask(task.getId()); + taskToUpdate = new PersistentTask<>(taskToUpdate, JobState.CLOSING); updatedTasks.put(taskToUpdate.getId(), taskToUpdate); - PersistentTasksInProgress newTasks = new PersistentTasksInProgress(currentTasks.getCurrentId(), updatedTasks); + PersistentTasks newTasks = new PersistentTasks(currentTasks.getCurrentId(), updatedTasks); MlMetadata mlMetadata = currentState.metaData().custom(MlMetadata.TYPE); Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); @@ -325,7 +325,7 @@ public class CloseJobAction extends Action results = new HashMap<>(); MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); - PersistentTasksInProgress tasksInProgress = state.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasksInProgress = state.getMetaData().custom(PersistentTasks.TYPE); if (request.getDatafeedId().equals(ALL) == false && mlMetadata.getDatafeed(request.getDatafeedId()) == null) { throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); } for (DatafeedConfig datafeedConfig : mlMetadata.getDatafeeds().values()) { if (request.getDatafeedId().equals(ALL) || datafeedConfig.getId().equals(request.getDatafeedId())) { - PersistentTaskInProgress task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasksInProgress); + PersistentTask task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasksInProgress); DatafeedState datafeedState = MlMetadata.getDatafeedState(request.getDatafeedId(), tasksInProgress); DiscoveryNode node = null; String explanation = null; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java index c229ee94e7d..df2fce15130 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java @@ -44,7 +44,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; import java.io.IOException; import java.util.ArrayList; @@ -394,10 +394,10 @@ public class GetJobsStatsAction extends Action> stats = processManager.getStatistics(jobId); if (stats.isPresent()) { - PersistentTasksInProgress.PersistentTaskInProgress pTask = MlMetadata.getJobTask(jobId, tasks); + PersistentTasks.PersistentTask pTask = MlMetadata.getJobTask(jobId, tasks); DiscoveryNode node = state.nodes().get(pTask.getExecutorNode()); JobState jobState = MlMetadata.getJobState(jobId, tasks); String assignmentExplanation = pTask.getAssignment().getExplanation(); @@ -420,13 +420,13 @@ public class GetJobsStatsAction extends Action jobStats = new AtomicArray<>(jobIds.size()); - PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterService.state().getMetaData().custom(PersistentTasks.TYPE); for (int i = 0; i < jobIds.size(); i++) { int slot = i; String jobId = jobIds.get(i); gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { JobState jobState = MlMetadata.getJobState(request.jobId, tasks); - PersistentTasksInProgress.PersistentTaskInProgress pTask = MlMetadata.getJobTask(jobId, tasks); + PersistentTasks.PersistentTask pTask = MlMetadata.getJobTask(jobId, tasks); String assignmentExplanation = null; if (pTask != null) { assignmentExplanation = pTask.getAssignment().getExplanation(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index c01f2302e95..7989082a0c1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -54,10 +54,10 @@ import org.elasticsearch.xpack.persistent.PersistentActionRegistry; import org.elasticsearch.xpack.persistent.PersistentActionRequest; import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionService; -import org.elasticsearch.xpack.persistent.PersistentTask; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.NodePersistentTask; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import org.elasticsearch.xpack.persistent.TransportPersistentAction; import java.io.IOException; @@ -218,7 +218,7 @@ public class OpenJobAction extends Action cancelHandler; @@ -320,12 +320,12 @@ public class OpenJobAction extends Action listener) { + protected void nodeOperation(NodePersistentTask task, Request request, ActionListener listener) { autodetectProcessManager.setJobState(task.getPersistentTaskId(), JobState.OPENING, e1 -> { if (e1 != null) { listener.onFailure(e1); @@ -376,7 +376,7 @@ public class OpenJobAction extends Action task = MlMetadata.getJobTask(jobId, tasks); + PersistentTask task = MlMetadata.getJobTask(jobId, tasks); JobState jobState = MlMetadata.getJobState(jobId, tasks); if (task != null && jobState == JobState.OPENED) { if (task.isAssigned() == false) { @@ -418,7 +418,7 @@ public class OpenJobAction extends Action reasons = new LinkedList<>(); DiscoveryNode minLoadedNode = null; - PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks persistentTasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { Map nodeAttributes = node.getAttributes(); String maxNumberOfOpenJobsStr = nodeAttributes.get(AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.getKey()); @@ -431,9 +431,9 @@ public class OpenJobAction extends Action { + if (persistentTasks != null) { + numberOfAssignedJobs = persistentTasks.getNumberOfTasksOnNode(node.getId(), OpenJobAction.NAME); + numberOfAllocatingJobs = persistentTasks.findTasks(OpenJobAction.NAME, task -> { if (node.getId().equals(task.getExecutorNode()) == false) { return false; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 6932158d303..8d962e092af 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -53,10 +53,10 @@ import org.elasticsearch.xpack.persistent.PersistentActionRegistry; import org.elasticsearch.xpack.persistent.PersistentActionRequest; import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionService; -import org.elasticsearch.xpack.persistent.PersistentTask; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.NodePersistentTask; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import org.elasticsearch.xpack.persistent.TransportPersistentAction; import java.io.IOException; @@ -225,7 +225,7 @@ public class StartDatafeedAction } } - public static class DatafeedTask extends PersistentTask { + public static class DatafeedTask extends NodePersistentTask { private final String datafeedId; private final long startTime; @@ -327,14 +327,15 @@ public class StartDatafeedAction @Override public void validate(Request request, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); DiscoveryNodes nodes = clusterState.getNodes(); StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, nodes); } @Override - protected void nodeOperation(PersistentTask persistentTask, Request request, ActionListener listener) { - DatafeedTask datafeedTask = (DatafeedTask) persistentTask; + protected void nodeOperation(NodePersistentTask nodePersistentTask, Request request, + ActionListener listener) { + DatafeedTask datafeedTask = (DatafeedTask) nodePersistentTask; datafeedJobRunner.run(datafeedTask, (error) -> { if (error != null) { @@ -372,7 +373,7 @@ public class StartDatafeedAction } - static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks, DiscoveryNodes nodes) { + static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasks tasks, DiscoveryNodes nodes) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { throw ExceptionsHelper.missingDatafeedException(datafeedId); @@ -388,7 +389,7 @@ public class StartDatafeedAction RestStatus.CONFLICT, JobState.OPENED, jobState); } - PersistentTaskInProgress datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); + PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); if (datafeedTask != null && datafeedState == DatafeedState.STARTED) { if (datafeedTask.isAssigned() == false) { @@ -410,11 +411,11 @@ public class StartDatafeedAction public static Assignment selectNode(Logger logger, String datafeedId, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); DiscoveryNodes nodes = clusterState.getNodes(); - PersistentTaskInProgress jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); + PersistentTask jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks); if (jobTask == null) { String reason = "cannot start datafeed [" + datafeed.getId() + "], job task doesn't yet exist"; logger.debug(reason); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 66c9cee2b6d..30b3d462518 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -44,7 +44,8 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.DatafeedStateObserver; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.io.IOException; import java.util.List; @@ -215,7 +216,7 @@ public class StopDatafeedAction ClusterState state = clusterService.state(); MetaData metaData = state.metaData(); MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); - PersistentTasksInProgress tasks = metaData.custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = metaData.custom(PersistentTasks.TYPE); String nodeId = validateAndReturnNodeId(request.getDatafeedId(), mlMetadata, tasks); request.setNodes(nodeId); ActionListener finalListener = @@ -260,12 +261,12 @@ public class StopDatafeedAction } } - static String validateAndReturnNodeId(String datafeedId, MlMetadata mlMetadata, PersistentTasksInProgress tasks) { + static String validateAndReturnNodeId(String datafeedId, MlMetadata mlMetadata, PersistentTasks tasks) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId)); } - PersistentTasksInProgress.PersistentTaskInProgress task = MlMetadata.getDatafeedTask(datafeedId, tasks); + PersistentTask task = MlMetadata.getDatafeedTask(datafeedId, tasks); if (task == null || task.getStatus() != DatafeedState.STARTED) { throw new ElasticsearchStatusException("datafeed already stopped, expected datafeed state [{}], but got [{}]", RestStatus.CONFLICT, DatafeedState.STARTED, DatafeedState.STOPPED); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index e5a3ce90d69..9812f1ee56d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -30,7 +30,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; import java.io.IOException; import java.util.List; @@ -62,8 +62,8 @@ public abstract class TransportJobTaskAction jobTask = MlMetadata.getJobTask(jobId, tasks); + PersistentTasks tasks = clusterService.state().getMetaData().custom(PersistentTasks.TYPE); + PersistentTasks.PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask == null || jobTask.isAssigned() == false) { listener.onFailure( new ElasticsearchStatusException("job [" + jobId + "] state is [" + JobState.CLOSED + "], but must be [" + JobState.OPENED + "] to perform requested action", RestStatus.CONFLICT)); @@ -76,7 +76,7 @@ public abstract class TransportJobTaskAction listener) { ClusterState state = clusterService.state(); - PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); JobState jobState = MlMetadata.getJobState(request.getJobId(), tasks); if (jobState == JobState.OPENED) { innerTaskOperation(request, task, listener); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java index eacb89800ab..bec24c383b7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java @@ -32,7 +32,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; import java.io.IOException; import java.util.Objects; @@ -161,10 +161,10 @@ public class UpdateDatafeedAction extends Action new Semaphore(1)).acquire(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 4343f5af91d..d5951e880c1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import java.time.Duration; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 25eaba2f0ed..8af8d85dbd2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobStorageDeletionTask; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; import java.util.Collections; import java.util.List; @@ -127,7 +127,7 @@ public class JobManager extends AbstractComponent { } public JobState getJobState(String jobId) { - PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterService.state().getMetaData().custom(PersistentTasks.TYPE); return MlMetadata.getJobState(jobId, tasks); } @@ -281,7 +281,7 @@ public class JobManager extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) throws Exception { MlMetadata.Builder builder = createMlMetadataBuilder(currentState); - builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksInProgress.TYPE)); + builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasks.TYPE)); return buildNewClusterState(currentState, builder); } }); @@ -307,7 +307,7 @@ public class JobManager extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) throws Exception { MlMetadata currentMlMetadata = currentState.metaData().custom(MlMetadata.TYPE); - PersistentTasksInProgress tasks = currentState.metaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = currentState.metaData().custom(PersistentTasks.TYPE); MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); builder.markJobAsDeleted(jobId, tasks); return buildNewClusterState(currentState, builder); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStateObserver.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStateObserver.java index 9b4d1b6ad2e..ce8251d2883 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStateObserver.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/DatafeedStateObserver.java @@ -14,7 +14,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; import java.util.function.Consumer; import java.util.function.Predicate; @@ -35,7 +35,7 @@ public class DatafeedStateObserver { ClusterStateObserver observer = new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext()); Predicate predicate = (newState) -> { - PersistentTasksInProgress tasks = newState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = newState.getMetaData().custom(PersistentTasks.TYPE); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); return datafeedState == expectedState; }; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java index 20bb8f48d6b..ce62ecc89ec 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java @@ -16,7 +16,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; import java.util.function.Consumer; import java.util.function.Predicate; @@ -87,7 +87,7 @@ public class JobStateObserver { @Override public boolean test(ClusterState newState) { - PersistentTasksInProgress tasks = newState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = newState.getMetaData().custom(PersistentTasks.TYPE); JobState jobState = MlMetadata.getJobState(jobId, tasks); if (jobState == JobState.FAILED) { failed = true; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTask.java similarity index 87% rename from plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java rename to plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTask.java index bf5be288335..f02aefb7117 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/NodePersistentTask.java @@ -10,14 +10,14 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.TaskId; /** - * Task that returns additional state information + * Represents a executor node operation that corresponds to a persistent task */ -public class PersistentTask extends CancellableTask { +public class NodePersistentTask extends CancellableTask { private Provider statusProvider; private long persistentTaskId; - public PersistentTask(long id, String type, String action, String description, TaskId parentTask) { + public NodePersistentTask(long id, String type, String action, String description, TaskId parentTask) { super(id, type, action, description, parentTask); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java index c2e1fd6e937..34156bb5017 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java @@ -24,7 +24,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.transport.TransportResponse.Empty; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.io.IOException; import java.util.HashMap; @@ -62,15 +62,15 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl @Override public void clusterChanged(ClusterChangedEvent event) { - PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress previousTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = event.state().getMetaData().custom(PersistentTasks.TYPE); + PersistentTasks previousTasks = event.previousState().getMetaData().custom(PersistentTasks.TYPE); if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) { // We have some changes let's check if they are related to our node String localNodeId = event.state().getNodes().getLocalNodeId(); Set notVisitedTasks = new HashSet<>(runningTasks.keySet()); if (tasks != null) { - for (PersistentTaskInProgress taskInProgress : tasks.tasks()) { + for (PersistentTask taskInProgress : tasks.tasks()) { if (localNodeId.equals(taskInProgress.getExecutorNode())) { PersistentTaskId persistentTaskId = new PersistentTaskId(taskInProgress.getId(), taskInProgress.getAllocationId()); RunningPersistentTask persistentTask = runningTasks.get(persistentTaskId); @@ -111,10 +111,10 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl } - private void startTask(PersistentTaskInProgress taskInProgress) { + private void startTask(PersistentTask taskInProgress) { PersistentActionRegistry.PersistentActionHolder holder = persistentActionRegistry.getPersistentActionHolderSafe(taskInProgress.getAction()); - PersistentTask task = (PersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]", + NodePersistentTask task = (NodePersistentTask) taskManager.register("persistent", taskInProgress.getAction() + "[c]", taskInProgress.getRequest()); boolean processed = false; try { @@ -283,23 +283,23 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl } private static class RunningPersistentTask implements Provider { - private final PersistentTask task; + private final NodePersistentTask task; private final long id; private final AtomicReference state; @Nullable private Exception failure; - RunningPersistentTask(PersistentTask task, long id) { + RunningPersistentTask(NodePersistentTask task, long id) { this(task, id, State.STARTED); } - RunningPersistentTask(PersistentTask task, long id, State state) { + RunningPersistentTask(NodePersistentTask task, long id, State state) { this.task = task; this.id = id; this.state = new AtomicReference<>(state); } - public PersistentTask getTask() { + public NodePersistentTask getTask() { return task; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionExecutor.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionExecutor.java index 64700448441..540caff25e8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionExecutor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionExecutor.java @@ -21,7 +21,7 @@ public class PersistentActionExecutor { } public void executeAction(Request request, - PersistentTask task, + NodePersistentTask task, PersistentActionRegistry.PersistentActionHolder holder, ActionListener listener) { threadPool.executor(holder.getExecutor()).execute(new AbstractRunnable() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionRequest.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionRequest.java index 0f0609bc2ba..3bec793a402 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionRequest.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionRequest.java @@ -17,6 +17,6 @@ import org.elasticsearch.tasks.TaskId; public abstract class PersistentActionRequest extends ActionRequest implements NamedWriteable, ToXContent { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { - return new PersistentTask(id, type, action, getDescription(), parentTaskId); + return new NodePersistentTask(id, type, action, getDescription(), parentTaskId); } } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java index b39aa3d31ef..0b74fafd3db 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java @@ -13,15 +13,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportResponse.Empty; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.util.Objects; @@ -56,7 +55,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C public ClusterState execute(ClusterState currentState) throws Exception { final Assignment assignment; if (stopped) { - assignment = PersistentTasksInProgress.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere + assignment = PersistentTasks.FINISHED_TASK_ASSIGNMENT; // the task is stopped no need to assign it anywhere } else { assignment = getAssignement(action, currentState, request); } @@ -71,7 +70,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { listener.onResponse( - ((PersistentTasksInProgress) newState.getMetaData().custom(PersistentTasksInProgress.TYPE)).getCurrentId()); + ((PersistentTasks) newState.getMetaData().custom(PersistentTasks.TYPE)).getCurrentId()); } }); } @@ -95,7 +94,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + PersistentTasks.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { if (failure != null) { // If the task failed - we need to restart it on another node, otherwise we just remove it @@ -133,7 +132,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + PersistentTasks.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { return update(currentState, tasksInProgress .assignTask(id, (action, request) -> getAssignement(action, currentState, request))); @@ -164,7 +163,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + PersistentTasks.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { return update(currentState, tasksInProgress.removeTask(id)); } else { @@ -195,7 +194,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + PersistentTasks.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { return update(currentState, tasksInProgress.updateTaskStatus(id, status)); } else { @@ -239,15 +238,15 @@ public class PersistentTaskClusterService extends AbstractComponent implements C } static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { - PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress prevTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = event.state().getMetaData().custom(PersistentTasks.TYPE); + PersistentTasks prevTasks = event.previousState().getMetaData().custom(PersistentTasks.TYPE); if (tasks != null && (Objects.equals(tasks, prevTasks) == false || event.nodesChanged() || event.routingTableChanged() || event.previousState().nodes().isLocalNodeElectedMaster() == false)) { // We need to check if removed nodes were running any of the tasks and reassign them boolean reassignmentRequired = false; - for (PersistentTaskInProgress taskInProgress : tasks.tasks()) { + for (PersistentTask taskInProgress : tasks.tasks()) { if (taskInProgress.needsReassignment(event.state().nodes())) { // there is an unassigned task or task with a disappeared node - we need to try assigning it if (Objects.equals(taskInProgress.getAssignment(), @@ -287,13 +286,13 @@ public class PersistentTaskClusterService extends AbstractComponent implements C } static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) { - PersistentTasksInProgress tasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = currentState.getMetaData().custom(PersistentTasks.TYPE); ClusterState clusterState = currentState; DiscoveryNodes nodes = currentState.nodes(); if (tasks != null) { logger.trace("reassigning {} persistent tasks", tasks.tasks().size()); // We need to check if removed nodes were running any of the tasks and reassign them - for (PersistentTaskInProgress task : tasks.tasks()) { + for (PersistentTask task : tasks.tasks()) { if (task.needsReassignment(nodes)) { // there is an unassigned task - we need to try assigning it Assignment assignment = decider.getAssignment(task.getAction(), clusterState, task.getRequest()); @@ -316,14 +315,14 @@ public class PersistentTaskClusterService extends AbstractComponent implements C return clusterState; } - private static PersistentTasksInProgress.Builder builder(ClusterState currentState) { - return PersistentTasksInProgress.builder(currentState.getMetaData().custom(PersistentTasksInProgress.TYPE)); + private static PersistentTasks.Builder builder(ClusterState currentState) { + return PersistentTasks.builder(currentState.getMetaData().custom(PersistentTasks.TYPE)); } - private static ClusterState update(ClusterState currentState, PersistentTasksInProgress.Builder tasksInProgress) { + private static ClusterState update(ClusterState currentState, PersistentTasks.Builder tasksInProgress) { if (tasksInProgress.isChanged()) { return ClusterState.builder(currentState).metaData( - MetaData.builder(currentState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasksInProgress.build()) + MetaData.builder(currentState.metaData()).putCustom(PersistentTasks.TYPE, tasksInProgress.build()) ).build(); } else { return currentState; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasks.java similarity index 80% rename from plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java rename to plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasks.java index f9c669f91a0..1205432319c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasks.java @@ -43,25 +43,25 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru /** * A cluster state record that contains a list of all running persistent tasks */ -public final class PersistentTasksInProgress extends AbstractNamedDiffable implements MetaData.Custom { +public final class PersistentTasks extends AbstractNamedDiffable implements MetaData.Custom { public static final String TYPE = "persistent_tasks"; private static final String API_CONTEXT = MetaData.XContentContext.API.toString(); // TODO: Implement custom Diff for tasks - private final Map> tasks; + private final Map> tasks; private final long currentId; - public PersistentTasksInProgress(long currentId, Map> tasks) { + public PersistentTasks(long currentId, Map> tasks) { this.currentId = currentId; this.tasks = tasks; } - private static final ObjectParser PERSISTENT_TASKS_IN_PROGRESS_PARSER = new ObjectParser<>(TYPE, Builder::new); + private static final ObjectParser PERSISTENT_TASKS_PARSER = new ObjectParser<>(TYPE, Builder::new); - private static final ObjectParser, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER = - new ObjectParser<>("running_tasks", TaskBuilder::new); + private static final ObjectParser, Void> PERSISTENT_TASK_PARSER = + new ObjectParser<>("tasks", TaskBuilder::new); public static final ConstructingObjectParser ASSIGNMENT_PARSER = new ConstructingObjectParser<>("assignment", objects -> new Assignment((String) objects[0], (String) objects[1])); @@ -73,22 +73,20 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable taskBuilder, List objects) -> { if (objects.size() != 1) { throw new IllegalArgumentException("only one action request per task is allowed"); @@ -96,7 +94,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable taskBuilder, List objects) -> { if (objects.size() != 1) { throw new IllegalArgumentException("only one status per task is allowed"); @@ -105,31 +103,31 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable> tasks() { + public Collection> tasks() { return this.tasks.values(); } - public Map> taskMap() { + public Map> taskMap() { return this.tasks; } - public PersistentTaskInProgress getTask(long id) { + public PersistentTask getTask(long id) { return this.tasks.get(id); } - public Collection> findTasks(String actionName, Predicate> predicate) { + public Collection> findTasks(String actionName, Predicate> predicate) { return this.tasks().stream() .filter(p -> actionName.equals(p.getAction())) .filter(predicate) .collect(Collectors.toList()); } - public boolean tasksExist(String actionName, Predicate> predicate) { + public boolean tasksExist(String actionName, Predicate> predicate) { return this.tasks().stream() .filter(p -> actionName.equals(p.getAction())) .anyMatch(predicate); @@ -139,7 +137,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable implements Writeable, ToXContent { + public static class PersistentTask implements Writeable, ToXContent { private final long id; private final long allocationId; private final String action; @@ -237,24 +235,22 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable task, boolean stopped, Assignment assignment) { + public PersistentTask(PersistentTask task, boolean stopped, Assignment assignment) { this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status, assignment, task.allocationId); } - public PersistentTaskInProgress(PersistentTaskInProgress task, Status status) { + public PersistentTask(PersistentTask task, Status status) { this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.assignment, task.allocationId); } - private PersistentTaskInProgress(long id, long allocationId, String action, Request request, - boolean stopped, boolean removeOnCompletion, Status status, - Assignment assignment, Long allocationIdOnLastStatusUpdate) { + private PersistentTask(long id, long allocationId, String action, Request request, boolean stopped, boolean removeOnCompletion, + Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; this.action = action; @@ -269,7 +265,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable that = (PersistentTaskInProgress) o; + PersistentTask that = (PersistentTask) o; return id == that.id && allocationId == that.allocationId && Objects.equals(action, that.action) && @@ -481,8 +477,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable build() { - return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, + public PersistentTask build() { + return new PersistentTask<>(id, allocationId, action, request, stopped, removeOnCompletion, status, assignment, allocationIdOnLastStatusUpdate); } } @@ -492,9 +488,9 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable entry : tasks.values()) { + builder.startArray("tasks"); + for (PersistentTask entry : tasks.values()) { entry.toXContent(builder, params); } builder.endArray(); @@ -529,19 +525,19 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable> tasks = new HashMap<>(); + private final Map> tasks = new HashMap<>(); private long currentId; private boolean changed; public Builder() { } - public Builder(PersistentTasksInProgress tasksInProgress) { + public Builder(PersistentTasks tasksInProgress) { if (tasksInProgress != null) { tasks.putAll(tasksInProgress.tasks); currentId = tasksInProgress.currentId; @@ -557,7 +553,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable Builder setTasks(List> tasks) { for (TaskBuilder builder : tasks) { - PersistentTaskInProgress task = builder.build(); + PersistentTask task = builder.build(); this.tasks.put(task.getId(), task); } return this; @@ -572,7 +568,7 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable(currentId, action, request, stopped, removeOnCompletion, assignment)); + tasks.put(currentId, new PersistentTask<>(currentId, action, request, stopped, removeOnCompletion, assignment)); return this; } @@ -580,10 +576,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); + PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); } return this; } @@ -597,12 +593,12 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable Builder assignTask(long taskId, BiFunction executorNodeFunc) { - PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); + PersistentTask taskInProgress = (PersistentTask) tasks.get(taskId); if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); if (assignment.isAssigned() || taskInProgress.isStopped()) { changed = true; - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); } } return this; @@ -614,11 +610,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable Builder reassignTask(long taskId, BiFunction executorNodeFunc) { - PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); + PersistentTask taskInProgress = (PersistentTask) tasks.get(taskId); if (taskInProgress != null) { changed = true; Assignment assignment = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, assignment)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment)); } return this; } @@ -627,10 +623,10 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); + PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, status)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, status)); } return this; } @@ -651,13 +647,13 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable taskInProgress = tasks.get(taskId); + PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; if (taskInProgress.removeOnCompletion) { tasks.remove(taskId); } else { - tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT)); } } return this; @@ -684,8 +680,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable selector) { long minLoad = Long.MAX_VALUE; DiscoveryNode minLoadedNode = null; - PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks persistentTasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { if (selector.test(node)) { - if (persistentTasksInProgress == null) { + if (persistentTasks == null) { // We don't have any task running yet, pick the first available node return node; } - long numberOfTasks = persistentTasksInProgress.getNumberOfTasksOnNode(node.getId(), actionName); + long numberOfTasks = persistentTasks.getNumberOfTasksOnNode(node.getId(), actionName); if (minLoad > numberOfTasks) { minLoad = numberOfTasks; minLoadedNode = node; @@ -103,7 +103,7 @@ public abstract class TransportPersistentAction listener) { + protected void updatePersistentTaskStatus(NodePersistentTask task, Task.Status status, ActionListener listener) { persistentActionService.updateStatus(task.getPersistentTaskId(), status, new ActionListener() { @Override @@ -125,7 +125,7 @@ public abstract class TransportPersistentAction listener); + protected abstract void nodeOperation(NodePersistentTask task, Request request, ActionListener listener); public String getExecutor() { return executor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java index 86fab277a5a..d97640dee02 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/package-info.java @@ -16,7 +16,7 @@ * {@link org.elasticsearch.xpack.persistent.PersistentTaskClusterService} to update cluster state with the record about running persistent * task. *

- * 2. The master node updates the {@link org.elasticsearch.xpack.persistent.PersistentTasksInProgress} in the cluster state to indicate that + * 2. The master node updates the {@link org.elasticsearch.xpack.persistent.PersistentTasks} in the cluster state to indicate that * there is a new persistent action * running in the system. *

@@ -24,11 +24,11 @@ * the cluster state and starts execution of all new actions assigned to the node it is running on. *

* 4. If the action fails to start on the node, the {@link org.elasticsearch.xpack.persistent.PersistentActionCoordinator} uses the - * {@link org.elasticsearch.xpack.persistent.PersistentTasksInProgress} to notify the + * {@link org.elasticsearch.xpack.persistent.PersistentTasks} to notify the * {@link org.elasticsearch.xpack.persistent.PersistentActionService}, which reassigns the action to another node in the cluster. *

* 5. If action finishes successfully on the node and calls listener.onResponse(), the corresponding persistent action is removed from the - * cluster state. + * cluster state unless . *

* 6. The {@link org.elasticsearch.xpack.persistent.RemovePersistentTaskAction} action can be also used to remove the persistent action. */ diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 7d3ddce0cc0..823f2394973 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -25,8 +25,8 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobTests; import org.elasticsearch.xpack.ml.support.AbstractSerializingTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.io.IOException; import java.util.Collections; @@ -35,7 +35,7 @@ import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests.createDatafeedJob; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; -import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT; +import static org.elasticsearch.xpack.persistent.PersistentTasks.INITIAL_ASSIGNMENT; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -133,7 +133,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getDatafeeds().get("1"), nullValue()); - builder.deleteJob("1", new PersistentTasksInProgress(0L, Collections.emptyMap())); + builder.deleteJob("1", new PersistentTasks(0L, Collections.emptyMap())); result = builder.build(); assertThat(result.getJobs().get("1"), nullValue()); assertThat(result.getDatafeeds().get("1"), nullValue()); @@ -148,10 +148,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getJobs().get("1"), sameInstance(job1)); assertThat(result.getDatafeeds().get("1"), nullValue()); - PersistentTaskInProgress task = createJobTask(0L, "1", null, JobState.CLOSED); + PersistentTask task = createJobTask(0L, "1", null, JobState.CLOSED); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder2.deleteJob("1", new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)))); + () -> builder2.deleteJob("1", new PersistentTasks(0L, Collections.singletonMap(0L, task)))); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); } @@ -163,7 +163,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { builder.putDatafeed(datafeedConfig1); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> builder.deleteJob(job1.getId(), new PersistentTasksInProgress(0L, Collections.emptyMap()))); + () -> builder.deleteJob(job1.getId(), new PersistentTasks(0L, Collections.emptyMap()))); assertThat(e.status(), equalTo(RestStatus.CONFLICT)); String expectedMsg = "Cannot delete job [" + job1.getId() + "] while datafeed [" + datafeedConfig1.getId() + "] refers to it"; assertThat(e.getMessage(), equalTo(expectedMsg)); @@ -172,7 +172,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { public void testRemoveJob_failBecauseJobDoesNotExist() { MlMetadata.Builder builder1 = new MlMetadata.Builder(); expectThrows(ResourceNotFoundException.class, - () -> builder1.deleteJob("1", new PersistentTasksInProgress(0L, Collections.emptyMap()))); + () -> builder1.deleteJob("1", new PersistentTasks(0L, Collections.emptyMap()))); } public void testCrudDatafeed() { @@ -187,7 +187,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); builder = new MlMetadata.Builder(result); - builder.removeDatafeed("datafeed1", new PersistentTasksInProgress(0, Collections.emptyMap())); + builder.removeDatafeed("datafeed1", new PersistentTasks(0, Collections.emptyMap())); result = builder.build(); assertThat(result.getJobs().get("job_id"), sameInstance(job1)); assertThat(result.getDatafeeds().get("datafeed1"), nullValue()); @@ -269,10 +269,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { MlMetadata beforeMetadata = builder.build(); StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); - PersistentTaskInProgress taskInProgress = - new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); - PersistentTasksInProgress tasksInProgress = - new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); + PersistentTask taskInProgress = + new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); + PersistentTasks tasksInProgress = + new PersistentTasks(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeedConfig1.getId()); update.setScrollSize(5000); @@ -331,10 +331,10 @@ public class MlMetadataTests extends AbstractSerializingTestCase { assertThat(result.getDatafeeds().get("datafeed1"), sameInstance(datafeedConfig1)); StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); - PersistentTaskInProgress taskInProgress = - new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); - PersistentTasksInProgress tasksInProgress = - new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); + PersistentTask taskInProgress = + new PersistentTask<>(0, StartDatafeedAction.NAME, request, false, true, INITIAL_ASSIGNMENT); + PersistentTasks tasksInProgress = + new PersistentTasks(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java index d4bec15e47a..f1f8a8a5b53 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java @@ -13,8 +13,8 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.util.Collections; @@ -26,14 +26,14 @@ public class CloseJobActionTests extends ESTestCase { public void testMoveJobToClosingState() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTaskInProgress task = + PersistentTask task = createJobTask(1L, "job_id", null, randomFrom(JobState.OPENED, JobState.FAILED)); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); + .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task)))); ClusterState result = CloseJobAction.moveJobToClosingState("job_id", csBuilder.build()); - PersistentTasksInProgress actualTasks = result.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks actualTasks = result.getMetaData().custom(PersistentTasks.TYPE); assertEquals(JobState.CLOSING, actualTasks.getTask(1L).getStatus()); MlMetadata actualMetadata = result.metaData().custom(MlMetadata.TYPE); @@ -44,24 +44,24 @@ public class CloseJobActionTests extends ESTestCase { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap()))); + .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap()))); expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder.build())); } public void testMoveJobToClosingState_unexpectedJobState() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); - PersistentTaskInProgress task = createJobTask(1L, "job_id", null, JobState.OPENING); + PersistentTask task = createJobTask(1L, "job_id", null, JobState.OPENING); ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); + .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.singletonMap(1L, task)))); ElasticsearchStatusException result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build())); assertEquals("cannot close job, expected job state [opened], but got [opening]", result.getMessage()); ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap()))); + .putCustom(PersistentTasks.TYPE, new PersistentTasks(1L, Collections.emptyMap()))); result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build())); assertEquals("cannot close job, expected job state [opened], but got [closed]", result.getMessage()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 1dc40af66e9..cb7a32c1f27 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -32,9 +32,9 @@ import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.net.InetAddress; import java.util.ArrayList; @@ -56,16 +56,16 @@ public class OpenJobActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress task = + PersistentTask task = createJobTask(1L, "job_id", "_node_id", randomFrom(JobState.CLOSED, JobState.FAILED)); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); - OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksInProgress(1L, Collections.emptyMap()), nodes); + OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasks(1L, Collections.emptyMap()), nodes); OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes); task = createJobTask(1L, "job_id", "_other_node_id", JobState.OPENED); - tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); } @@ -94,8 +94,8 @@ public class OpenJobActionTests extends ESTestCase { .build(); JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING); - PersistentTaskInProgress task = createJobTask(1L, "job_id", "_node_id", jobState); - PersistentTasksInProgress tasks1 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + PersistentTask task = createJobTask(1L, "job_id", "_node_id", jobState); + PersistentTasks tasks1 = new PersistentTasks(1L, Collections.singletonMap(1L, task)); Exception e = expectThrows(ElasticsearchStatusException.class, () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes)); @@ -103,7 +103,7 @@ public class OpenJobActionTests extends ESTestCase { jobState = randomFrom(JobState.OPENING, JobState.CLOSING); task = createJobTask(1L, "job_id", "_other_node_id", jobState); - PersistentTasksInProgress tasks2 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + PersistentTasks tasks2 = new PersistentTasks(1L, Collections.singletonMap(1L, task)); e = expectThrows(ElasticsearchStatusException.class, () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks2, nodes)); @@ -122,21 +122,21 @@ public class OpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - Map> taskMap = new HashMap<>(); - taskMap.put(0L, new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, + Map> taskMap = new HashMap<>(); + taskMap.put(0L, new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, new Assignment("_node_id1", "test assignment"))); - taskMap.put(1L, new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true, + taskMap.put(1L, new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true, new Assignment("_node_id1", "test assignment"))); - taskMap.put(2L, new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true, + taskMap.put(2L, new PersistentTask<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true, new Assignment("_node_id2", "test assignment"))); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap); + PersistentTasks tasks = new PersistentTasks(3L, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4"); cs.nodes(nodes); - metaData.putCustom(PersistentTasksInProgress.TYPE, tasks); + metaData.putCustom(PersistentTasks.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), 2, logger); @@ -150,7 +150,7 @@ public class OpenJobActionTests extends ESTestCase { Map nodeAttr = new HashMap<>(); nodeAttr.put(MAX_RUNNING_JOBS_PER_NODE.getKey(), String.valueOf(maxRunningJobsPerNode)); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); - Map> taskMap = new HashMap<>(); + Map> taskMap = new HashMap<>(); for (int i = 0; i < numNodes; i++) { String nodeId = "_node_id" + i; TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i); @@ -160,14 +160,14 @@ public class OpenJobActionTests extends ESTestCase { taskMap.put(id, createJobTask(id, "job_id" + id, nodeId, JobState.OPENED)); } } - PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap); + PersistentTasks tasks = new PersistentTasks(numNodes * maxRunningJobsPerNode, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); addJobAndIndices(metaData, routingTable, "job_id1", "job_id2"); cs.nodes(nodes); - metaData.putCustom(PersistentTasksInProgress.TYPE, tasks); + metaData.putCustom(PersistentTasks.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); @@ -184,17 +184,17 @@ public class OpenJobActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, + PersistentTask task = + new PersistentTask<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, new Assignment("_node_id1", "test assignment")); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task)); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); addJobAndIndices(metaData, routingTable, "job_id1", "job_id2"); cs.nodes(nodes); - metaData.putCustom(PersistentTasksInProgress.TYPE, tasks); + metaData.putCustom(PersistentTasks.TYPE, tasks); cs.metaData(metaData); cs.routingTable(routingTable.build()); Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), 2, logger); @@ -214,13 +214,13 @@ public class OpenJobActionTests extends ESTestCase { nodeAttr, Collections.emptySet(), Version.CURRENT)) .build(); - Map> taskMap = new HashMap<>(); + Map> taskMap = new HashMap<>(); taskMap.put(0L, createJobTask(0L, "job_id1", "_node_id1", JobState.OPENING)); taskMap.put(1L, createJobTask(1L, "job_id2", "_node_id1", JobState.OPENING)); taskMap.put(2L, createJobTask(2L, "job_id3", "_node_id2", JobState.OPENING)); taskMap.put(3L, createJobTask(3L, "job_id4", "_node_id2", JobState.OPENING)); taskMap.put(4L, createJobTask(4L, "job_id5", "_node_id3", JobState.OPENING)); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(5L, taskMap); + PersistentTasks tasks = new PersistentTasks(5L, taskMap); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); csBuilder.nodes(nodes); @@ -228,39 +228,39 @@ public class OpenJobActionTests extends ESTestCase { RoutingTable.Builder routingTable = RoutingTable.builder(); addJobAndIndices(metaData, routingTable, "job_id1", "job_id2", "job_id3", "job_id4", "job_id5", "job_id6", "job_id7"); csBuilder.routingTable(routingTable.build()); - metaData.putCustom(PersistentTasksInProgress.TYPE, tasks); + metaData.putCustom(PersistentTasks.TYPE, tasks); csBuilder.metaData(metaData); ClusterState cs = csBuilder.build(); Assignment result = OpenJobAction.selectLeastLoadedMlNode("job_id6", cs, 2, logger); assertEquals("_node_id3", result.getExecutorNode()); - PersistentTaskInProgress lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING); + PersistentTask lastTask = createJobTask(5L, "job_id6", "_node_id3", JobState.OPENING); taskMap.put(5L, lastTask); - tasks = new PersistentTasksInProgress(6L, taskMap); + tasks = new PersistentTasks(6L, taskMap); csBuilder = ClusterState.builder(cs); - csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks)); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks)); cs = csBuilder.build(); result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, false, new Assignment("_node_id3", "test assignment"))); - tasks = new PersistentTasksInProgress(6L, taskMap); + taskMap.put(5L, new PersistentTask<>(lastTask, false, new Assignment("_node_id3", "test assignment"))); + tasks = new PersistentTasks(6L, taskMap); csBuilder = ClusterState.builder(cs); - csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks)); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks)); cs = csBuilder.build(); result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); - taskMap.put(5L, new PersistentTaskInProgress<>(lastTask, null)); - tasks = new PersistentTasksInProgress(6L, taskMap); + taskMap.put(5L, new PersistentTask<>(lastTask, null)); + tasks = new PersistentTasks(6L, taskMap); csBuilder = ClusterState.builder(cs); - csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks)); + csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasks.TYPE, tasks)); cs = csBuilder.build(); result = OpenJobAction.selectLeastLoadedMlNode("job_id7", cs, 2, logger); assertNull("no node selected, because null state", result.getExecutorNode()); @@ -304,11 +304,11 @@ public class OpenJobActionTests extends ESTestCase { assertEquals(indexToRemove, result.get(0)); } - public static PersistentTaskInProgress createJobTask(long id, String jobId, String nodeId, JobState jobState) { - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, + public static PersistentTask createJobTask(long id, String jobId, String nodeId, JobState jobState) { + PersistentTask task = + new PersistentTask<>(id, OpenJobAction.NAME, new OpenJobAction.Request(jobId), false, true, new Assignment(nodeId, "test assignment")); - task = new PersistentTaskInProgress<>(task, jobState); + task = new PersistentTask<>(task, jobState); return task; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index ad29a52d96a..bd89060a8a1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -21,9 +21,9 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.net.InetAddress; import java.util.Collections; @@ -33,7 +33,7 @@ import java.util.Map; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; -import static org.elasticsearch.xpack.persistent.PersistentTasksInProgress.INITIAL_ASSIGNMENT; +import static org.elasticsearch.xpack.persistent.PersistentTasks.INITIAL_ASSIGNMENT; import static org.hamcrest.Matchers.equalTo; public class StartDatafeedActionTests extends ESTestCase { @@ -45,8 +45,8 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); JobState jobState = randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING); - PersistentTaskInProgress task = createJobTask(0L, job.getId(), "node_id", jobState); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask(0L, job.getId(), "node_id", jobState); + PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task)); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), @@ -55,7 +55,7 @@ public class StartDatafeedActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .putCustom(PersistentTasks.TYPE, tasks)) .nodes(nodes); Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); @@ -64,10 +64,10 @@ public class StartDatafeedActionTests extends ESTestCase { "] while state [opened] is required", result.getExplanation()); task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); - tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .putCustom(PersistentTasks.TYPE, tasks)) .nodes(nodes); result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); assertEquals("node_id", result.getExecutorNode()); @@ -80,8 +80,8 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); String nodeId = randomBoolean() ? "node_id2" : null; - PersistentTaskInProgress task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask(0L, job.getId(), nodeId, JobState.OPENED); + PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task)); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), @@ -90,7 +90,7 @@ public class StartDatafeedActionTests extends ESTestCase { ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .putCustom(PersistentTasks.TYPE, tasks)) .nodes(nodes); Assignment result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); @@ -99,10 +99,10 @@ public class StartDatafeedActionTests extends ESTestCase { result.getExplanation()); task = createJobTask(0L, job.getId(), "node_id1", JobState.OPENED); - tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .putCustom(PersistentTasks.TYPE, tasks)) .nodes(nodes); result = StartDatafeedAction.selectNode(logger, "datafeed_id", cs.build()); assertEquals("node_id1", result.getExecutorNode()); @@ -123,10 +123,10 @@ public class StartDatafeedActionTests extends ESTestCase { MlMetadata mlMetadata1 = new MlMetadata.Builder() .putJob(job1, false) .build(); - PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, + PersistentTask task = + new PersistentTask<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, INITIAL_ASSIGNMENT); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)); + PersistentTasks tasks = new PersistentTasks(0L, Collections.singletonMap(0L, task)); DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "job_id").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) .putDatafeed(datafeedConfig1) @@ -148,15 +148,15 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); - PersistentTaskInProgress datafeedTask = - new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), + PersistentTask jobTask = createJobTask(0L, "job_id", "node_id", JobState.OPENED); + PersistentTask datafeedTask = + new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), false, true, new Assignment("node_id", "test assignment")); - datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); - Map> taskMap = new HashMap<>(); + datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); + Map> taskMap = new HashMap<>(); taskMap.put(0L, jobTask); taskMap.put(1L, datafeedTask); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap); + PersistentTasks tasks = new PersistentTasks(2L, taskMap); Exception e = expectThrows(ElasticsearchStatusException.class, () -> StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes)); @@ -176,20 +176,20 @@ public class StartDatafeedActionTests extends ESTestCase { Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); - PersistentTaskInProgress jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED); - PersistentTaskInProgress datafeedTask = - new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), + PersistentTask jobTask = createJobTask(0L, "job_id", "node_id2", JobState.OPENED); + PersistentTask datafeedTask = + new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), false, true, new Assignment("node_id1", "test assignment")); - datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); - Map> taskMap = new HashMap<>(); + datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); + Map> taskMap = new HashMap<>(); taskMap.put(0L, jobTask); taskMap.put(1L, datafeedTask); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(2L, taskMap); + PersistentTasks tasks = new PersistentTasks(2L, taskMap); StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes); - datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), + datafeedTask = new PersistentTask<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), false, true, INITIAL_ASSIGNMENT); - datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); + datafeedTask = new PersistentTask<>(datafeedTask, DatafeedState.STARTED); taskMap.put(1L, datafeedTask); StartDatafeedAction.validate("datafeed_id", mlMetadata1, tasks, nodes); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index fa899ffaa4a..785870f7430 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -16,8 +16,8 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.persistent.PersistentActionRequest; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.util.Collections; @@ -47,10 +47,10 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe } public void testValidate() { - PersistentTaskInProgress task = new PersistentTaskInProgress(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", "")); - task = new PersistentTaskInProgress<>(task, DatafeedState.STARTED); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasks.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STARTED); + PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task)); Job job = createDatafeedJob().build(); MlMetadata mlMetadata1 = new MlMetadata.Builder().putJob(job, false).build(); @@ -66,14 +66,14 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe } public void testValidate_alreadyStopped() { - PersistentTasksInProgress tasks; + PersistentTasks tasks; if (randomBoolean()) { - PersistentTaskInProgress task = new PersistentTaskInProgress(1L, StartDatafeedAction.NAME, - new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasksInProgress.Assignment("node_id", "")); - task = new PersistentTaskInProgress<>(task, DatafeedState.STOPPED); - tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); + PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("foo", 0L), false, false, new PersistentTasks.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STOPPED); + tasks = new PersistentTasks(1L, Collections.singletonMap(1L, task)); } else { - tasks = randomBoolean() ? null : new PersistentTasksInProgress(0L, Collections.emptyMap()); + tasks = randomBoolean() ? null : new PersistentTasks(0L, Collections.emptyMap()); } Job job = createDatafeedJob().build(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index 165fd552acf..b08f9e48670 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -40,8 +40,8 @@ import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.notifications.AuditMessage; import org.elasticsearch.xpack.ml.notifications.Auditor; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Response; import org.junit.Before; @@ -91,15 +91,15 @@ public class DatafeedJobRunnerTests extends ESTestCase { Job job = createDatafeedJob().build(); mlMetadata.putJob(job, false); mlMetadata.putDatafeed(createDatafeedConfig("datafeed_id", job.getId()).build()); - PersistentTaskInProgress task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); - PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); + PersistentTask task = createJobTask(0L, job.getId(), "node_id", JobState.OPENED); + PersistentTasks tasks = new PersistentTasks(1L, Collections.singletonMap(0L, task)); DiscoveryNodes nodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node_name", "node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT)) .build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksInProgress.TYPE, tasks)) + .putCustom(PersistentTasks.TYPE, tasks)) .nodes(nodes); clusterService = mock(ClusterService.class); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 3691ccfd849..9081f4beca9 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -25,8 +25,8 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.io.IOException; import java.util.Collection; @@ -164,8 +164,8 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); - PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); + PersistentTask task = tasks.taskMap().values().iterator().next(); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); Map expectedNodeAttr = new HashMap<>(); @@ -215,13 +215,13 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { // Sample each cs update and keep track each time a node holds more than `maxConcurrentJobAllocations` opening jobs. List violations = new CopyOnWriteArrayList<>(); internalCluster().clusterService(nonMlNode).addListener(event -> { - PersistentTasksInProgress tasks = event.state().metaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = event.state().metaData().custom(PersistentTasks.TYPE); if (tasks == null) { return; } for (DiscoveryNode node : event.state().nodes()) { - Collection> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> { + Collection> foundTasks = tasks.findTasks(OpenJobAction.NAME, task -> { return node.getId().equals(task.getExecutorNode()) && (task.getStatus() == null || task.getStatus() == JobState.OPENING || task.isCurrentStatus() == false); }); @@ -246,9 +246,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); assertEquals(numJobs, tasks.taskMap().size()); - for (PersistentTaskInProgress task : tasks.taskMap().values()) { + for (PersistentTask task : tasks.taskMap().values()) { assertNotNull(task.getExecutorNode()); assertEquals(JobState.OPENED, task.getStatus()); } @@ -270,9 +270,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { ensureStableCluster(1, nonMlNode); assertBusy(() -> { ClusterState state = client(nonMlNode).admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); assertEquals(numJobs, tasks.taskMap().size()); - for (PersistentTaskInProgress task : tasks.taskMap().values()) { + for (PersistentTask task : tasks.taskMap().values()) { assertNull(task.getExecutorNode()); } }); @@ -286,9 +286,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { ensureStableCluster(1 + numMlNodes); assertBusy(() -> { ClusterState state = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = state.metaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); assertEquals(numJobs, tasks.taskMap().size()); - for (PersistentTaskInProgress task : tasks.taskMap().values()) { + for (PersistentTask task : tasks.taskMap().values()) { assertNotNull(task.getExecutorNode()); assertEquals(JobState.OPENED, task.getStatus()); } @@ -331,7 +331,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { client().execute(CloseJobAction.INSTANCE, closeJobRequest); assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); assertEquals(0, tasks.taskMap().size()); }); logger.info("Stop data node"); @@ -354,9 +354,9 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); assertEquals(1, tasks.taskMap().size()); - PersistentTaskInProgress task = tasks.findTasks(OpenJobAction.NAME, p -> { + PersistentTask task = tasks.findTasks(OpenJobAction.NAME, p -> { return p.getRequest() instanceof OpenJobAction.Request && jobId.equals(((OpenJobAction.Request) p.getRequest()).getJobId()); }).iterator().next(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index db55f50b9b7..d564914e912 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -24,8 +24,8 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -125,10 +125,10 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase { disrupt.run(); assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.metaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterState.metaData().custom(PersistentTasks.TYPE); assertNotNull(tasks); assertEquals(2, tasks.taskMap().size()); - for (PersistentTaskInProgress task : tasks.tasks()) { + for (PersistentTask task : tasks.tasks()) { assertFalse(task.needsReassignment(clusterState.nodes())); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 2c6c782013a..224657b22b6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -17,7 +17,7 @@ import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks; public class TooManyJobsIT extends BaseMlIntegTestCase { @@ -49,10 +49,10 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet(); assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.CLOSED); ClusterState state = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE); assertEquals(1, tasks.taskMap().size()); // now just double check that the first job is still opened: - PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); + PersistentTasks.PersistentTask task = tasks.taskMap().values().iterator().next(); assertEquals(JobState.OPENED, task.getStatus()); OpenJobAction.Request openJobRequest = (OpenJobAction.Request) task.getRequest(); assertEquals("1", openJobRequest.getJobId()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java index 3095bd431cc..53ef7ca6301 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Response; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; import java.io.IOException; @@ -72,7 +72,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase { ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) .build(); - PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(); + PersistentTasks.Builder tasks = PersistentTasks.builder(); boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { @@ -91,7 +91,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase { } MetaData.Builder metaData = MetaData.builder(state.metaData()); - metaData.putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + metaData.putCustom(PersistentTasks.TYPE, tasks.build()); ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); @@ -288,36 +288,36 @@ public class PersistentActionCoordinatorTests extends ESTestCase { private ClusterState addTask(ClusterState state, String action, Request request, String node) { - PersistentTasksInProgress.Builder builder = - PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); - return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + PersistentTasks.Builder builder = + PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE)); + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE, builder.addTask(action, request, false, true, new Assignment(node, "test assignment")).build())).build(); } private ClusterState reallocateTask(ClusterState state, long taskId, String node) { - PersistentTasksInProgress.Builder builder = - PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + PersistentTasks.Builder builder = + PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE)); assertTrue(builder.hasTask(taskId)); - return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE, builder.reassignTask(taskId, new Assignment(node, "test assignment")).build())).build(); } private ClusterState removeTask(ClusterState state, long taskId) { - PersistentTasksInProgress.Builder builder = - PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + PersistentTasks.Builder builder = + PersistentTasks.builder(state.getMetaData().custom(PersistentTasks.TYPE)); assertTrue(builder.hasTask(taskId)); - return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasks.TYPE, builder.removeTask(taskId).build())).build(); } private class Execution { private final PersistentActionRequest request; - private final PersistentTask task; + private final NodePersistentTask task; private final PersistentActionRegistry.PersistentActionHolder holder; private final ActionListener listener; - Execution(PersistentActionRequest request, PersistentTask task, PersistentActionRegistry.PersistentActionHolder holder, - ActionListener listener) { + Execution(PersistentActionRequest request, NodePersistentTask task, PersistentActionRegistry.PersistentActionHolder holder, + ActionListener listener) { this.request = request; this.task = task; this.holder = holder; @@ -333,7 +333,7 @@ public class PersistentActionCoordinatorTests extends ESTestCase { } @Override - public void executeAction(Request request, PersistentTask task, + public void executeAction(Request request, NodePersistentTask task, PersistentActionRegistry.PersistentActionHolder holder, ActionListener listener) { executions.add(new Execution(request, task, holder, listener)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java index cff1cb0a77d..d784f1cc6ae 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java @@ -8,7 +8,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; @@ -59,8 +59,8 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase { } } final int numberOfRunningTasks = runningTasks; - PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasks.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); if (numberOfRunningTasks > 0) { @@ -76,11 +76,11 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase { internalCluster().fullRestart(); ensureYellow(); - tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE); + tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasks.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); // Check that cluster state is correct for (int i = 0; i < numberOfTasks; i++) { - PersistentTaskInProgress task = tasksInProgress.getTask(taskIds[i]); + PersistentTask task = tasksInProgress.getTask(taskIds[i]); assertNotNull(task); assertThat(task.isStopped(), equalTo(stopped[i])); } @@ -93,9 +93,9 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase { }); // Start all other tasks - tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE); + tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasks.TYPE); for (int i = 0; i < numberOfTasks; i++) { - PersistentTaskInProgress task = tasksInProgress.getTask(taskIds[i]); + PersistentTask task = tasksInProgress.getTask(taskIds[i]); assertNotNull(task); logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode()); assertThat(task.isStopped(), equalTo(stopped[i])); @@ -119,8 +119,8 @@ public class PersistentActionFullRestartIT extends ESIntegTestCase { assertBusy(() -> { // Make sure the task is removed from the cluster state - assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksInProgress.TYPE)).tasks(), empty()); + assertThat(((PersistentTasks) internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasks.TYPE)).tasks(), empty()); }); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java index 72880e3fcbc..3a37a5d92c7 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java @@ -112,8 +112,8 @@ public class PersistentActionIT extends ESIntegTestCase { .setRemoveOnCompletion(false) .setStopped(stopped).get().getTaskId(); - PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasks.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(1)); assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped)); assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue()); @@ -150,8 +150,8 @@ public class PersistentActionIT extends ESIntegTestCase { assertBusy(() -> { // Wait for the task to be marked as stopped - PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasks.TYPE); assertThat(tasks.tasks().size(), equalTo(1)); assertThat(tasks.getTask(taskId).isStopped(), equalTo(true)); assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); @@ -202,8 +202,8 @@ public class PersistentActionIT extends ESIntegTestCase { TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]") .get().getTasks().get(0); - PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasks.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(1)); assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue()); @@ -216,8 +216,8 @@ public class PersistentActionIT extends ESIntegTestCase { int finalI = i; assertBusy(() -> { - PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasks.TYPE); assertThat(tasks.tasks().size(), equalTo(1)); assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue()); assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}")); @@ -258,8 +258,8 @@ public class PersistentActionIT extends ESIntegTestCase { assertThat(tasks.size(), equalTo(0)); // Make sure the task is removed from the cluster state - assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData() - .custom(PersistentTasksInProgress.TYPE)).tasks(), empty()); + assertThat(((PersistentTasks) internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasks.TYPE)).tasks(), empty()); }); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java index d27a68fc14b..e63b6baabb3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java @@ -18,8 +18,8 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; import java.util.ArrayList; @@ -66,14 +66,14 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { public void testReassignTasksWithNoTasks() { ClusterState clusterState = initialState(); - assertThat(reassign(clusterState).metaData().custom(PersistentTasksInProgress.TYPE), nullValue()); + assertThat(reassign(clusterState).metaData().custom(PersistentTasks.TYPE), nullValue()); } public void testReassignConsidersClusterStateUpdates() { ClusterState clusterState = initialState(); ClusterState.Builder builder = ClusterState.builder(clusterState); - PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder( - clusterState.metaData().custom(PersistentTasksInProgress.TYPE)); + PersistentTasks.Builder tasks = PersistentTasks.builder( + clusterState.metaData().custom(PersistentTasks.TYPE)); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(2, 40); @@ -81,11 +81,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false); } - MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE, tasks.build()); clusterState = builder.metaData(metaData).nodes(nodes).build(); ClusterState newClusterState = reassign(clusterState); - PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasksInProgress = newClusterState.getMetaData().custom(PersistentTasks.TYPE); assertThat(tasksInProgress, notNullValue()); } @@ -93,8 +93,8 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { public void testReassignTasks() { ClusterState clusterState = initialState(); ClusterState.Builder builder = ClusterState.builder(clusterState); - PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder( - clusterState.metaData().custom(PersistentTasksInProgress.TYPE)); + PersistentTasks.Builder tasks = PersistentTasks.builder( + clusterState.metaData().custom(PersistentTasks.TYPE)); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(0, 40); @@ -118,11 +118,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { } } - MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE, tasks.build()); clusterState = builder.metaData(metaData).nodes(nodes).build(); ClusterState newClusterState = reassign(clusterState); - PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasksInProgress = newClusterState.getMetaData().custom(PersistentTasks.TYPE); assertThat(tasksInProgress, notNullValue()); assertThat("number of tasks shouldn't change as a result or reassignment", @@ -130,7 +130,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { int assignOneCount = 0; - for (PersistentTaskInProgress task : tasksInProgress.tasks()) { + for (PersistentTask task : tasksInProgress.tasks()) { if (task.isStopped()) { assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue()); assertThat(task.getAssignment().getExplanation(), equalTo("explanation: " + task.getAction())); @@ -141,7 +141,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { assertThat(task.getExecutorNode(), notNullValue()); assertThat(task.isAssigned(), equalTo(true)); if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { - logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString()); + logger.info(clusterState.metaData().custom(PersistentTasks.TYPE).toString()); } assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); @@ -203,7 +203,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { DiscoveryNodes nodes = clusterState.nodes(); - PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasksInProgress = clusterState.getMetaData().custom(PersistentTasks.TYPE); if (tasksInProgress.findTasks("assign_one", task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) { return randomNodeAssignment(clusterState.nodes()); @@ -232,15 +232,15 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { return "nodes_changed: " + event.nodesChanged() + " nodes_removed:" + event.nodesRemoved() + " routing_table_changed:" + event.routingTableChanged() + - " tasks: " + event.state().metaData().custom(PersistentTasksInProgress.TYPE); + " tasks: " + event.state().metaData().custom(PersistentTasks.TYPE); } private ClusterState significantChange(ClusterState clusterState) { ClusterState.Builder builder = ClusterState.builder(clusterState); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); if (tasks != null) { if (randomBoolean()) { - for (PersistentTaskInProgress task : tasks.tasks()) { + for (PersistentTask task : tasks.tasks()) { if (task.isAssigned() && clusterState.nodes().nodeExists(task.getExecutorNode())) { logger.info("removed node {}", task.getExecutorNode()); builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode())); @@ -255,11 +255,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { // we don't have any unassigned tasks - add some if (randomBoolean()) { logger.info("added random task"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasks.builder(tasks), null, false); tasksOrNodesChanged = true; } else { logger.info("added unassignable task with custom assignment message"); - addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasks.builder(tasks), new Assignment(null, "change me"), "never_assign", false); tasksOrNodesChanged = true; } @@ -282,11 +282,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { return builder.build(); } - private PersistentTasksInProgress removeTasksWithChangingAssignment(PersistentTasksInProgress tasks) { + private PersistentTasks removeTasksWithChangingAssignment(PersistentTasks tasks) { if (tasks != null) { boolean changed = false; - PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks); - for (PersistentTaskInProgress task : tasks.tasks()) { + PersistentTasks.Builder tasksBuilder = PersistentTasks.builder(tasks); + for (PersistentTask task : tasks.tasks()) { // Remove all unassigned tasks that cause changing assignments they might trigger a significant change if ("never_assign".equals(((TestRequest) task.getRequest()).getTestParam()) && "change me".equals(task.getAssignment().getExplanation())) { @@ -304,9 +304,9 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { private ClusterState insignificantChange(ClusterState clusterState) { ClusterState.Builder builder = ClusterState.builder(clusterState); - PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasks tasks = clusterState.getMetaData().custom(PersistentTasks.TYPE); tasks = removeTasksWithChangingAssignment(tasks); - PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks); + PersistentTasks.Builder tasksBuilder = PersistentTasks.builder(tasks); if (randomBoolean()) { if (hasAssignableTasks(tasks, clusterState.nodes()) == false) { @@ -328,7 +328,7 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { } else { logger.info("changed routing table"); MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); - metaData.putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build()); + metaData.putCustom(PersistentTasks.TYPE, tasksBuilder.build()); RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); changeRoutingTable(metaData, routingTable); builder.metaData(metaData).routingTable(routingTable.build()); @@ -350,18 +350,18 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { // clear the task if (randomBoolean()) { logger.info("removed all tasks"); - MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, - PersistentTasksInProgress.builder().build()); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasks.TYPE, + PersistentTasks.builder().build()); return builder.metaData(metaData).build(); } else { logger.info("set task custom to null"); - MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasksInProgress.TYPE); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasks.TYPE); return builder.metaData(metaData).build(); } } logger.info("removed all unassigned tasks and changed routing table"); if (tasks != null) { - for (PersistentTaskInProgress task : tasks.tasks()) { + for (PersistentTask task : tasks.tasks()) { if (task.getExecutorNode() == null || "never_assign".equals(((TestRequest) task.getRequest()).getTestParam())) { tasksBuilder.removeTask(task.getId()); } @@ -374,11 +374,11 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { .numberOfReplicas(1) .build(); MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).put(indexMetaData, false) - .putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build()); + .putCustom(PersistentTasks.TYPE, tasksBuilder.build()); return builder.metaData(metaData).build(); } - private boolean hasAssignableTasks(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) { + private boolean hasAssignableTasks(PersistentTasks tasks, DiscoveryNodes discoveryNodes) { if (tasks == null || tasks.tasks().isEmpty()) { return false; } @@ -393,26 +393,26 @@ public class PersistentTaskClusterServiceTests extends ESTestCase { }); } - private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) { + private boolean hasTasksAssignedTo(PersistentTasks tasks, String nodeId) { return tasks != null && tasks.tasks().stream().anyMatch( task -> nodeId.equals(task.getExecutorNode())) == false; } private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, - MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks, + MetaData.Builder metaData, PersistentTasks.Builder tasks, String node, boolean stopped) { return addRandomTask(clusterStateBuilder, metaData, tasks, new Assignment(node, randomAsciiOfLength(10)), randomAsciiOfLength(10), stopped); } private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, - MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks, + MetaData.Builder metaData, PersistentTasks.Builder tasks, Assignment assignment, String param, boolean stopped) { - return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE, + return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasks.TYPE, tasks.addTask(randomAsciiOfLength(10), new TestRequest(param), stopped, randomBoolean(), assignment).build())); } - private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) { + private void addTask(PersistentTasks.Builder tasks, String action, String param, String node, boolean stopped) { tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), new Assignment(node, "explanation: " + action)); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksTests.java similarity index 85% rename from plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java rename to plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksTests.java index f93b786dc5f..ea2ac1577fb 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksTests.java @@ -21,9 +21,9 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Builder; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.Builder; +import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.Status; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; @@ -37,12 +37,12 @@ import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY; import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT; import static org.elasticsearch.xpack.persistent.TransportPersistentAction.NO_NODE_FOUND; -public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase { +public class PersistentTasksTests extends AbstractDiffableSerializationTestCase { @Override - protected PersistentTasksInProgress createTestInstance() { + protected PersistentTasks createTestInstance() { int numberOfTasks = randomInt(10); - PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(); + PersistentTasks.Builder tasks = PersistentTasks.builder(); for (int i = 0; i < numberOfTasks; i++) { boolean stopped = randomBoolean(); tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), @@ -57,14 +57,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio @Override protected Writeable.Reader instanceReader() { - return PersistentTasksInProgress::new; + return PersistentTasks::new; } @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Arrays.asList( - new Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), - new Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + new Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new), + new Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom), new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new), new Entry(Task.Status.class, Status.NAME, Status::new) )); @@ -72,7 +72,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio @Override protected Custom makeTestChanges(Custom testInstance) { - PersistentTasksInProgress tasksInProgress = (PersistentTasksInProgress) testInstance; + PersistentTasks tasksInProgress = (PersistentTasks) testInstance; Builder builder = new Builder(); switch (randomInt(3)) { case 0: @@ -105,12 +105,12 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio @Override protected Writeable.Reader> diffReader() { - return PersistentTasksInProgress::readDiffFrom; + return PersistentTasks::readDiffFrom; } @Override - protected PersistentTasksInProgress doParseInstance(XContentParser parser) throws IOException { - return PersistentTasksInProgress.fromXContent(parser); + protected PersistentTasks doParseInstance(XContentParser parser) throws IOException { + return PersistentTasks.fromXContent(parser); } @Override @@ -142,7 +142,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio return builder; } - private long pickRandomTask(PersistentTasksInProgress testInstance) { + private long pickRandomTask(PersistentTasks testInstance) { return randomFrom(new ArrayList<>(testInstance.tasks())).getId(); } @@ -157,9 +157,9 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio @SuppressWarnings("unchecked") public void testSerializationContext() throws Exception { - PersistentTasksInProgress testInstance = createTestInstance(); + PersistentTasks testInstance = createTestInstance(); for (int i = 0; i < randomInt(10); i++) { - testInstance = (PersistentTasksInProgress) makeTestChanges(testInstance); + testInstance = (PersistentTasks) makeTestChanges(testInstance); } ToXContent.MapParams params = new ToXContent.MapParams( @@ -170,12 +170,12 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio XContentBuilder shuffled = shuffleXContent(builder); XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled.bytes()); - PersistentTasksInProgress newInstance = doParseInstance(parser); + PersistentTasks newInstance = doParseInstance(parser); assertNotSame(newInstance, testInstance); assertEquals(testInstance.tasks().size(), newInstance.tasks().size()); - for (PersistentTaskInProgress testTask : testInstance.tasks()) { - PersistentTaskInProgress newTask = (PersistentTaskInProgress) newInstance.getTask(testTask.getId()); + for (PersistentTask testTask : testInstance.tasks()) { + PersistentTask newTask = (PersistentTask) newInstance.getTask(testTask.getId()); assertNotNull(newTask); // Things that should be serialized @@ -192,14 +192,14 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio } public void testBuilder() { - PersistentTasksInProgress persistentTasksInProgress = null; + PersistentTasks persistentTasks = null; long lastKnownTask = -1; for (int i = 0; i < randomIntBetween(10, 100); i++) { final Builder builder; if (randomBoolean()) { builder = new Builder(); } else { - builder = new Builder(persistentTasksInProgress); + builder = new Builder(persistentTasks); } boolean changed = false; for (int j = 0; j < randomIntBetween(1, 10); j++) { @@ -220,7 +220,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio break; case 2: if (builder.hasTask(lastKnownTask)) { - PersistentTaskInProgress task = builder.build().getTask(lastKnownTask); + PersistentTask task = builder.build().getTask(lastKnownTask); if (randomBoolean()) { // Trying to reassign to the same node builder.assignTask(lastKnownTask, (s, request) -> task.getAssignment()); @@ -263,7 +263,7 @@ public class PersistentTasksInProgressTests extends AbstractDiffableSerializatio } } assertEquals(changed, builder.isChanged()); - persistentTasksInProgress = builder.build(); + persistentTasks = builder.build(); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java index 3cde981a7c0..7389f2f4b09 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java @@ -49,7 +49,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; import java.io.IOException; import java.util.ArrayList; @@ -107,8 +107,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { new NamedWriteableRegistry.Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new), new NamedWriteableRegistry.Entry(Task.Status.class, PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new), - new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), - new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasks.TYPE, PersistentTasks::new), + new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasks.TYPE, PersistentTasks::readDiffFrom), new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) ); } @@ -116,8 +116,8 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { @Override public List getNamedXContent() { return Arrays.asList( - new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE), - PersistentTasksInProgress::fromXContent), + new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasks.TYPE), + PersistentTasks::fromXContent), new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME), TestRequest::fromXContent), new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent) @@ -368,7 +368,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { } @Override - protected void nodeOperation(PersistentTask task, TestRequest request, ActionListener listener) { + protected void nodeOperation(NodePersistentTask task, TestRequest request, ActionListener listener) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -451,7 +451,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { } - public static class TestTask extends PersistentTask { + public static class TestTask extends NodePersistentTask { private volatile String operation; public TestTask(long id, String type, String action, String description, TaskId parentTask) {