diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b1c87951c5d..b6d6ea02264 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -131,7 +131,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksExecutorRegistry; import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; import org.elasticsearch.xpack.persistent.PersistentTasksService; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; -import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.xpack.security.InternalClient; @@ -305,9 +304,9 @@ public class MachineLearning implements ActionPlugin { new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager); PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList( - new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, clusterService, + new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, clusterService, autodetectProcessManager, auditor), - new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState, persistentTasksService, + new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState, datafeedManager, auditor) )); @@ -423,7 +422,6 @@ public class MachineLearning implements ActionPlugin { new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), - new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index d232ef52501..ddad90d4ff4 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -42,7 +42,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.xpack.security.InternalClient; @@ -276,9 +275,9 @@ public class CloseJobAction extends Action listener) { auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); - persistentTasksService.removeTask(persistentTaskId, new PersistentTaskOperationListener() { + persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener>() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask task) { listener.onResponse(new Response(true)); } @@ -301,9 +300,9 @@ public class CloseJobAction extends Action listener) { persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.timeout, - new WaitForPersistentTaskStatusListener() { + new WaitForPersistentTaskStatusListener() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask task) { logger.debug("finalizing job [{}]", request.getJobId()); FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(request.getJobId()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 2e6957c0ae2..be6139e965c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -56,7 +56,6 @@ import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignme import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksExecutor; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import java.io.IOException; @@ -307,10 +306,10 @@ public class OpenJobAction extends Action listener) { if (licenseState.isMachineLearningAllowed()) { - PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() { + ActionListener> finalListener = new ActionListener>() { @Override - public void onResponse(long taskId) { - waitForJobStarted(taskId, request, listener); + public void onResponse(PersistentTask task) { + waitForJobStarted(task.getId(), request, listener); } @Override @@ -318,7 +317,7 @@ public class OpenJobAction extends Action listener) { JobPredicate predicate = new JobPredicate(); persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout, - new WaitForPersistentTaskStatusListener() { + new WaitForPersistentTaskStatusListener() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask persistentTask) { listener.onResponse(new Response(predicate.opened)); } @@ -377,9 +376,9 @@ public class OpenJobAction extends Action listener) { if (licenseState.isMachineLearningAllowed()) { - PersistentTaskOperationListener finalListener = new PersistentTaskOperationListener() { + ActionListener> finalListener = new ActionListener>() { @Override - public void onResponse(long taskId) { - waitForYellow(taskId, request, listener); + public void onResponse(PersistentTask persistentTask) { + waitForYellow(persistentTask.getId(), request, listener); } @Override @@ -373,7 +372,7 @@ public class StartDatafeedAction listener.onFailure(e); } }; - persistentTasksService.createPersistentActionTask(NAME, request, finalListener); + persistentTasksService.startPersistentTask(NAME, request, finalListener); } else { listener.onFailure(LicenseUtils.newComplianceException(XPackPlugin.MACHINE_LEARNING)); } @@ -404,9 +403,9 @@ public class StartDatafeedAction return datafeedState == DatafeedState.STARTED; }; persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, request.timeout, - new WaitForPersistentTaskStatusListener() { + new WaitForPersistentTaskStatusListener() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask task) { listener.onResponse(new Response(true)); } @@ -425,9 +424,8 @@ public class StartDatafeedAction private final ThreadPool threadPool; public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState, - PersistentTasksService persistentTasksService, DatafeedManager datafeedManager, - Auditor auditor) { - super(settings, NAME, persistentTasksService, ThreadPool.Names.MANAGEMENT); + DatafeedManager datafeedManager, Auditor auditor) { + super(settings, NAME, ThreadPool.Names.MANAGEMENT); this.licenseState = licenseState; this.datafeedManager = datafeedManager; this.auditor = auditor; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 28307672592..352a5ea29a2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -41,7 +41,6 @@ import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import java.io.IOException; @@ -256,9 +255,9 @@ public class StopDatafeedAction // so wait for that to happen here. void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener listener) { persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getTimeout(), - new WaitForPersistentTaskStatusListener() { + new WaitForPersistentTaskStatusListener() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask task) { listener.onResponse(response); } @@ -270,9 +269,9 @@ public class StopDatafeedAction } private void forceStopTask(long persistentTaskId, ActionListener listener) { - persistentTasksService.removeTask(persistentTaskId, new PersistentTaskOperationListener() { + persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener>() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask persistentTask) { listener.onResponse(new Response(true)); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index e1d18b88527..9b541adba8d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -32,8 +32,9 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import java.time.Duration; import java.util.Collections; @@ -94,9 +95,9 @@ public class DatafeedManager extends AbstractComponent { } Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task); runningDatafeeds.put(datafeedId, holder); - task.updatePersistentStatus(DatafeedState.STARTED, new PersistentTaskOperationListener() { + task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener>() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask persistentTask) { innerRun(holder, task.getDatafeedStartTime(), task.getEndTime()); } @@ -359,9 +360,9 @@ public class DatafeedManager extends AbstractComponent { private void closeJob() { persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(20), - new WaitForPersistentTaskStatusListener() { + new WaitForPersistentTaskStatusListener() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask PersistentTask) { CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(datafeed.getJobId()); client.execute(CloseJobAction.INSTANCE, closeJobRequest, new ActionListener() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 3871408d55a..806fbe656c3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.collect.Tuple; @@ -43,7 +44,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater; import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.io.InputStream; @@ -53,7 +54,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -349,9 +349,9 @@ public class AutodetectProcessManager extends AbstractComponent { } private void setJobState(JobTask jobTask, JobState state) { - jobTask.updatePersistentStatus(state, new PersistentTaskOperationListener() { + jobTask.updatePersistentStatus(state, new ActionListener>() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask persistentTask) { logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()); } @@ -363,9 +363,9 @@ public class AutodetectProcessManager extends AbstractComponent { } public void setJobState(JobTask jobTask, JobState state, CheckedConsumer handler) { - jobTask.updatePersistentStatus(state, new PersistentTaskOperationListener() { + jobTask.updatePersistentStatus(state, new ActionListener>() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask persistentTask) { try { handler.accept(null); } catch (IOException e1) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java index 61b7fd027f8..86c3e5ccae6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/AllocatedPersistentTask.java @@ -57,7 +57,7 @@ public class AllocatedPersistentTask extends CancellableTask { * * This doesn't affect the status of this allocated task. */ - public void updatePersistentStatus(Task.Status status, PersistentTasksService.PersistentTaskOperationListener listener) { + public void updatePersistentStatus(Task.Status status, ActionListener> listener) { persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java index cb3fe39a1c3..c1b91f5f029 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CompletionPersistentTaskAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -24,8 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; @@ -35,7 +34,7 @@ import java.util.Objects; * removed from the cluster state in case of successful completion or restarted on some other node in case of failure. */ public class CompletionPersistentTaskAction extends Action { public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction(); @@ -51,8 +50,8 @@ public class CompletionPersistentTaskAction extends Action { @@ -104,49 +103,15 @@ public class CompletionPersistentTaskAction extends Action { + PersistentTaskResponse, CompletionPersistentTaskAction.RequestBuilder> { protected RequestBuilder(ElasticsearchClient client, CompletionPersistentTaskAction action) { super(client, action, new Request()); } } - public static class TransportAction extends TransportMasterNodeAction { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTasksClusterService persistentTasksClusterService; @@ -166,8 +131,8 @@ public class CompletionPersistentTaskAction extends Action listener) { - persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, new ActionListener() { - @Override - public void onResponse(Empty empty) { - listener.onResponse(newResponse()); - } + protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { + persistentTasksClusterService.completePersistentTask(request.taskId, request.exception, + new ActionListener>() { + @Override + public void onResponse(PersistentTask task) { + listener.onResponse(new PersistentTaskResponse(task)); + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java index 223064e6128..1e99581f13b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; @@ -185,10 +186,11 @@ public class CreatePersistentTaskAction extends Action listener) { persistentTasksClusterService.createPersistentTask(request.action, request.request, - new ActionListener() { + new ActionListener>() { + @Override - public void onResponse(Long newTaskId) { - listener.onResponse(new PersistentTaskResponse(newTaskId)); + public void onResponse(PersistentTask task) { + listener.onResponse(new PersistentTaskResponse(task)); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskResponse.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskResponse.java index af55acf8f04..6b2e85004b7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskResponse.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskResponse.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; @@ -16,30 +17,30 @@ import java.util.Objects; * Response upon a successful start or an persistent task */ public class PersistentTaskResponse extends ActionResponse { - private long taskId; + private PersistentTask task; public PersistentTaskResponse() { super(); } - public PersistentTaskResponse(long taskId) { - this.taskId = taskId; + public PersistentTaskResponse(PersistentTask task) { + this.task = task; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - taskId = in.readLong(); + task = in.readOptionalWriteable(PersistentTask::new); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeLong(taskId); + out.writeOptionalWriteable(task); } - public long getTaskId() { - return taskId; + public PersistentTask getTask() { + return task; } @Override @@ -47,11 +48,11 @@ public class PersistentTaskResponse extends ActionResponse { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; PersistentTaskResponse that = (PersistentTaskResponse) o; - return taskId == that.taskId; + return Objects.equals(task, that.task); } @Override public int hashCode() { - return Objects.hash(taskId); + return Objects.hash(task); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java index fc7735da4f6..1a49da04313 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksClusterService.java @@ -48,7 +48,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param listener the listener that will be called when task is started */ public void createPersistentTask(String action, Request request, - ActionListener listener) { + ActionListener> listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -63,10 +63,15 @@ public class PersistentTasksClusterService extends AbstractComponent implements listener.onFailure(e); } + @SuppressWarnings("unchecked") @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse( - ((PersistentTasksCustomMetaData) newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)).getCurrentId()); + PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasks != null) { + listener.onResponse(tasks.getTask(tasks.getCurrentId())); + } else { + listener.onResponse(null); + } } }); } @@ -79,7 +84,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param failure the reason for restarting the task or null if the task completed successfully * @param listener the listener that will be called when task is removed */ - public void completePersistentTask(long id, Exception failure, ActionListener listener) { + public void completePersistentTask(long id, Exception failure, ActionListener> listener) { final String source; if (failure != null) { logger.warn("persistent task " + id + " failed", failure); @@ -108,38 +113,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(Empty.INSTANCE); - } - }); - } - - /** - * Switches the persistent task from stopped to started mode - * - * @param id the id of a persistent task - * @param listener the listener that will be called when task is removed - */ - public void startPersistentTask(long id, ActionListener listener) { - clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); - if (tasksInProgress.hasTask(id)) { - return update(currentState, tasksInProgress - .assignTask(id, (action, request) -> getAssignement(action, currentState, request))); - } else { - throw new ResourceNotFoundException("the task with id {} doesn't exist", id); - } - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(Empty.INSTANCE); + // Using old state since in the new state the task is already gone + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(oldState, id)); } }); } @@ -150,7 +125,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param id the id of a persistent task * @param listener the listener that will be called when task is removed */ - public void removePersistentTask(long id, ActionListener listener) { + public void removePersistentTask(long id, ActionListener> listener) { clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -169,7 +144,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(Empty.INSTANCE); + // Using old state since in the new state the task is already gone + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(oldState, id)); } }); } @@ -182,7 +158,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements * @param status new status * @param listener the listener that will be called when task is removed */ - public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener listener) { + public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener> listener) { clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -206,7 +182,7 @@ public class PersistentTasksClusterService extends AbstractComponent implements @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(Empty.INSTANCE); + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, id)); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java index 4dda6357163..92d4a714652 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksCustomMetaData.java @@ -268,7 +268,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable() { @Override - public void onResponse(long taskId) { - logger.trace("Persistent task with id {} was cancelled", taskId); + public void onResponse(CancelTasksResponse cancelTasksResponse) { + logger.trace("Persistent task with id {} was cancelled", task.getId()); } @@ -240,7 +240,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu } } - private class PublishedResponseListener implements PersistentTaskOperationListener { + private class PublishedResponseListener implements ActionListener> { private final AllocatedPersistentTask task; PublishedResponseListener(final AllocatedPersistentTask task) { @@ -249,8 +249,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu @Override - public void onResponse(long taskId) { - logger.trace("notification for task {} was successful", task.getPersistentTaskId()); + public void onResponse(PersistentTask persistentTask) { + logger.trace("notification for task {} was successful", task.getId()); if (task.markAsNotified() == false) { logger.warn("attempt to mark task {} in the {} state as NOTIFIED", task.getPersistentTaskId(), task.getState()); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index 1ba41e8e3eb..46b120ea3e3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -41,16 +42,15 @@ public class PersistentTasksService extends AbstractComponent { } /** - * Creates the specified persistent action. The action is started unless the stopped parameter is equal to true. - * If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion. - * Otherwise it will remain there in the stopped state. + * Creates the specified persistent task and attempts to assign it to a node. */ - public void createPersistentActionTask(String action, Request request, - PersistentTaskOperationListener listener) { - CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request); + @SuppressWarnings("unchecked") + public void startPersistentTask(String taskName, Request request, + ActionListener> listener) { + CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(taskName, request); try { client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap( - o -> listener.onResponse(o.getTaskId()), listener::onFailure)); + o -> listener.onResponse((PersistentTask) o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } @@ -58,13 +58,12 @@ public class PersistentTasksService extends AbstractComponent { /** * Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure - * */ - public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) { + public void sendCompletionNotification(long taskId, Exception failure, ActionListener> listener) { CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure); try { - client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, ActionListener.wrap(o -> listener.onResponse(taskId), - listener::onFailure)); + client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest, + ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } @@ -73,14 +72,13 @@ public class PersistentTasksService extends AbstractComponent { /** * Cancels the persistent task. */ - public void sendCancellation(long taskId, PersistentTaskOperationListener listener) { + void sendCancellation(long taskId, ActionListener listener) { DiscoveryNode localNode = clusterService.localNode(); CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId)); cancelTasksRequest.setReason("persistent action was removed"); try { - client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(o -> listener.onResponse(taskId), - listener::onFailure)); + client.admin().cluster().cancelTasks(cancelTasksRequest, listener); } catch (Exception e) { listener.onFailure(e); } @@ -88,28 +86,28 @@ public class PersistentTasksService extends AbstractComponent { /** * Updates status of the persistent task. - * + *

* Persistent task implementers shouldn't call this method directly and use * {@link AllocatedPersistentTask#updatePersistentStatus} instead */ - void updateStatus(long taskId, long allocationId, Task.Status status, PersistentTaskOperationListener listener) { + void updateStatus(long taskId, long allocationId, Task.Status status, ActionListener> listener) { UpdatePersistentTaskStatusAction.Request updateStatusRequest = new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status); try { client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap( - o -> listener.onResponse(taskId), listener::onFailure)); + o -> listener.onResponse(o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); } } /** - * Removes a persistent task + * Cancels if needed and removes a persistent task */ - public void removeTask(long taskId, PersistentTaskOperationListener listener) { + public void cancelPersistentTask(long taskId, ActionListener> listener) { RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId); try { - client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(taskId), + client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); } catch (Exception e) { listener.onFailure(e); @@ -121,15 +119,15 @@ public class PersistentTasksService extends AbstractComponent { * waits of it. */ public void waitForPersistentTaskStatus(long taskId, Predicate> predicate, @Nullable TimeValue timeout, - WaitForPersistentTaskStatusListener listener) { + WaitForPersistentTaskStatusListener listener) { ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) { - listener.onResponse(taskId); + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId)); } else { stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - listener.onResponse(taskId); + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(state, taskId)); } @Override @@ -145,15 +143,10 @@ public class PersistentTasksService extends AbstractComponent { } } - public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener { + public interface WaitForPersistentTaskStatusListener + extends ActionListener> { default void onTimeout(TimeValue timeout) { onFailure(new IllegalStateException("timed out after " + timeout)); } } - - public interface PersistentTaskOperationListener { - void onResponse(long taskId); - void onFailure(Exception e); - } - } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java index d43ab432929..75cd3dc4f93 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java @@ -26,12 +26,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; public class RemovePersistentTaskAction extends Action { public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction(); @@ -47,8 +48,8 @@ public class RemovePersistentTaskAction extends Action { @@ -98,42 +99,8 @@ public class RemovePersistentTaskAction extends Action { + PersistentTaskResponse, RemovePersistentTaskAction.RequestBuilder> { protected RequestBuilder(ElasticsearchClient client, RemovePersistentTaskAction action) { super(client, action, new Request()); @@ -146,7 +113,7 @@ public class RemovePersistentTaskAction extends Action { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTasksClusterService persistentTasksClusterService; @@ -166,8 +133,8 @@ public class RemovePersistentTaskAction extends Action listener) { - persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener() { + protected final void masterOperation(final Request request, ClusterState state, + final ActionListener listener) { + persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener>() { @Override - public void onResponse(Empty empty) { - listener.onResponse(new Response(true)); + public void onResponse(PersistentTask task) { + listener.onResponse(new PersistentTaskResponse(task)); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java deleted file mode 100644 index d52c6f77693..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.persistent; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse.Empty; -import org.elasticsearch.transport.TransportService; - -import java.io.IOException; -import java.util.Objects; - -/** - * This action can be used to start a persistent task previously created using {@link CreatePersistentTaskAction} - */ -public class StartPersistentTaskAction extends Action { - - public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction(); - public static final String NAME = "cluster:admin/persistent/start"; - - private StartPersistentTaskAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, this); - } - - @Override - public Response newResponse() { - return new Response(); - } - - public static class Request extends MasterNodeRequest { - - private long taskId; - - public Request() { - - } - - public Request(long taskId) { - this.taskId = taskId; - } - - public void setTaskId(long taskId) { - this.taskId = taskId; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - taskId = in.readLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(taskId); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Request request = (Request) o; - return taskId == request.taskId; - } - - @Override - public int hashCode() { - return Objects.hash(taskId); - } - } - - public static class Response extends AcknowledgedResponse { - public Response() { - super(); - } - - public Response(boolean acknowledged) { - super(acknowledged); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - readAcknowledged(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - writeAcknowledged(out); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AcknowledgedResponse that = (AcknowledgedResponse) o; - return isAcknowledged() == that.isAcknowledged(); - } - - @Override - public int hashCode() { - return Objects.hash(isAcknowledged()); - } - - } - - public static class RequestBuilder extends MasterNodeOperationRequestBuilder { - - protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) { - super(client, action, new Request()); - } - - public final RequestBuilder setTaskId(long taskId) { - request.setTaskId(taskId); - return this; - } - - } - - public static class TransportAction extends TransportMasterNodeAction { - - private final PersistentTasksClusterService persistentTasksClusterService; - - @Inject - public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - PersistentTasksClusterService persistentTasksClusterService, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, StartPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, Request::new); - this.persistentTasksClusterService = persistentTasksClusterService; - } - - @Override - protected String executor() { - return ThreadPool.Names.MANAGEMENT; - } - - @Override - protected Response newResponse() { - return new Response(); - } - - @Override - protected ClusterBlockException checkBlock(Request request, ClusterState state) { - // Cluster is not affected but we look up repositories in metadata - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); - } - - @Override - protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { - persistentTasksClusterService.startPersistentTask(request.taskId, new ActionListener() { - @Override - public void onResponse(Empty empty) { - listener.onResponse(new Response(true)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } - } -} - - diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java index a60adc8c2d3..3b4b186c52c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskStatusAction.java @@ -27,12 +27,13 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; public class UpdatePersistentTaskStatusAction extends Action { public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction(); @@ -48,8 +49,8 @@ public class UpdatePersistentTaskStatusAction extends Action { @@ -118,42 +119,8 @@ public class UpdatePersistentTaskStatusAction extends Action { + PersistentTaskResponse, UpdatePersistentTaskStatusAction.RequestBuilder> { protected RequestBuilder(ElasticsearchClient client, UpdatePersistentTaskStatusAction action) { super(client, action, new Request()); @@ -171,7 +138,7 @@ public class UpdatePersistentTaskStatusAction extends Action { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTasksClusterService persistentTasksClusterService; @@ -191,8 +158,8 @@ public class UpdatePersistentTaskStatusAction extends Action listener) { + protected final void masterOperation(final Request request, ClusterState state, + final ActionListener listener) { persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status, - new ActionListener() { + new ActionListener>() { @Override - public void onResponse(Empty empty) { - listener.onResponse(new Response(true)); + public void onResponse(PersistentTask task) { + listener.onResponse(new PersistentTaskResponse(task)); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index fca20330051..2372c3a5cfc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -46,7 +47,6 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -318,8 +318,8 @@ public class DatafeedManagerTests extends ESTestCase { when(task.getEndTime()).thenReturn(endTime); doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") - PersistentTaskOperationListener listener = (PersistentTaskOperationListener) invocationOnMock.getArguments()[1]; - listener.onResponse(0L); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(mock(PersistentTask.class)); return null; }).when(task).updatePersistentStatus(any(), any()); return task; @@ -334,8 +334,8 @@ public class DatafeedManagerTests extends ESTestCase { task = spy(task); doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") - PersistentTaskOperationListener listener = (PersistentTaskOperationListener) invocationOnMock.getArguments()[1]; - listener.onResponse(0L); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(mock(PersistentTask.class)); return null; }).when(task).updatePersistentStatus(any(), any()); return task; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/CancelPersistentTaskResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/CancelPersistentTaskResponseTests.java deleted file mode 100644 index aec9ca4241b..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/CancelPersistentTaskResponseTests.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.persistent; - -import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction.Response; -import org.elasticsearch.test.AbstractStreamableTestCase; - -public class CancelPersistentTaskResponseTests extends AbstractStreamableTestCase { - - @Override - protected Response createTestInstance() { - return new Response(randomBoolean()); - } - - @Override - protected Response createBlankInstance() { - return new Response(); - } -} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java index 760e478f996..14aca8af442 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorFullRestartIT.java @@ -5,10 +5,10 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; -import org.elasticsearch.xpack.persistent.PersistentTasksExecutorIT.PersistentTaskOperationFuture; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; @@ -45,16 +45,16 @@ public class PersistentTasksExecutorFullRestartIT extends ESIntegTestCase { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); int numberOfTasks = randomIntBetween(1, 10); long[] taskIds = new long[numberOfTasks]; - List futures = new ArrayList<>(numberOfTasks); + List>> futures = new ArrayList<>(numberOfTasks); for (int i = 0; i < numberOfTasks; i++) { - PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); + PlainActionFuture> future = new PlainActionFuture<>(); futures.add(future); - service.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + service.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); } for (int i = 0; i < numberOfTasks; i++) { - taskIds[i] = futures.get(i).get(); + taskIds[i] = futures.get(i).get().getId(); } PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java index 895267c6b71..3982f525b44 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.Status; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; @@ -52,22 +53,20 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertNoRunningTasks(); } - public static class PersistentTaskOperationFuture extends PlainActionFuture implements WaitForPersistentTaskStatusListener { - @Override - public void onResponse(long taskId) { - set(taskId); - } + public static class WaitForPersistentTaskStatusFuture + extends PlainActionFuture> + implements WaitForPersistentTaskStatusListener { } public void testPersistentActionFailure() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); - persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - long taskId = future.get(); + PlainActionFuture> future = new PlainActionFuture<>(); + persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + long taskId = future.get().getId(); assertBusy(() -> { // Wait for the task to start assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), equalTo(1)); + .getTasks().size(), equalTo(1)); }); TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") .get().getTasks().get(0); @@ -91,13 +90,13 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testPersistentActionCompletion() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); - persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - long taskId = future.get(); + PlainActionFuture> future = new PlainActionFuture<>(); + persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + long taskId = future.get().getId(); assertBusy(() -> { // Wait for the task to start assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() - .getTasks().size(), equalTo(1)); + .getTasks().size(), equalTo(1)); }); TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") .get().getTasks().get(0); @@ -110,11 +109,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { public void testPersistentActionWithNoAvailableNode() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); + PlainActionFuture> future = new PlainActionFuture<>(); TestRequest testRequest = new TestRequest("Blah"); testRequest.setExecutorNodeAttr("test"); - persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, testRequest, future); - long taskId = future.get(); + persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, testRequest, future); + long taskId = future.get().getId(); Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); String newNode = internalCluster().startNode(nodeSettings); @@ -122,7 +121,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertBusy(() -> { // Wait for the task to start assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() - .size(), equalTo(1)); + .size(), equalTo(1)); }); TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") .get().getTasks().get(0); @@ -139,21 +138,21 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { }); // Remove the persistent task - PersistentTaskOperationFuture removeFuture = new PersistentTaskOperationFuture(); - persistentTasksService.removeTask(taskId, removeFuture); - assertEquals(removeFuture.get(), (Long) taskId); + PlainActionFuture> removeFuture = new PlainActionFuture<>(); + persistentTasksService.cancelPersistentTask(taskId, removeFuture); + assertEquals(removeFuture.get().getId(), taskId); } public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); - PersistentTaskOperationFuture future = new PersistentTaskOperationFuture(); - persistentTasksService.createPersistentActionTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); - long taskId = future.get(); + PlainActionFuture> future = new PlainActionFuture<>(); + persistentTasksService.startPersistentTask(TestPersistentTasksExecutor.NAME, new TestRequest("Blah"), future); + long taskId = future.get().getId(); assertBusy(() -> { // Wait for the task to start assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() - .size(), equalTo(1)); + .size(), equalTo(1)); }); TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") .get().getTasks().get(0); @@ -171,27 +170,27 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { .get().getTasks().size(), equalTo(1)); int finalI = i; - PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture(); + WaitForPersistentTaskStatusFuture future1 = new WaitForPersistentTaskStatusFuture<>(); persistentTasksService.waitForPersistentTaskStatus(taskId, - task -> task != null && task.isCurrentStatus()&& task.getStatus().toString() != null && + task -> task != null && task.isCurrentStatus() && task.getStatus().toString() != null && task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), TimeValue.timeValueSeconds(10), future1); - assertThat(future1.get(), equalTo(taskId)); + assertThat(future1.get().getId(), equalTo(taskId)); } - PersistentTaskOperationFuture future1 = new PersistentTaskOperationFuture(); + WaitForPersistentTaskStatusFuture future1 = new WaitForPersistentTaskStatusFuture<>(); persistentTasksService.waitForPersistentTaskStatus(taskId, task -> false, TimeValue.timeValueMillis(10), future1); assertThrows(future1, IllegalStateException.class, "timed out after 10ms"); - PersistentTaskOperationFuture failedUpdateFuture = new PersistentTaskOperationFuture(); + PlainActionFuture> failedUpdateFuture = new PlainActionFuture<>(); persistentTasksService.updateStatus(taskId, -1, new Status("should fail"), failedUpdateFuture); assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId + " and allocation id -1 doesn't exist"); // Wait for the task to disappear - PersistentTaskOperationFuture future2 = new PersistentTaskOperationFuture(); + WaitForPersistentTaskStatusFuture future2 = new WaitForPersistentTaskStatusFuture<>(); persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2); logger.info("Completing the running task"); @@ -199,7 +198,7 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase { assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) .get().getTasks().size(), equalTo(1)); - assertThat(future2.get(), equalTo(taskId)); + assertThat(future2.get(), nullValue()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java index 28f078cd8b5..c8761e9a618 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksExecutorResponseTests.java @@ -5,17 +5,37 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; + +import java.util.Collections; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; public class PersistentTasksExecutorResponseTests extends AbstractStreamableTestCase { @Override protected PersistentTaskResponse createTestInstance() { - return new PersistentTaskResponse(randomLong()); + if (randomBoolean()) { + return new PersistentTaskResponse( + new PersistentTask(randomLong(), randomAsciiOfLength(10), + new TestPersistentTasksPlugin.TestRequest("test"), + PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); + } else { + return new PersistentTaskResponse(null); + } } @Override protected PersistentTaskResponse createBlankInstance() { return new PersistentTaskResponse(); } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(Collections.singletonList( + new NamedWriteableRegistry.Entry(PersistentTaskRequest.class, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, TestPersistentTasksPlugin.TestRequest::new) + )); + } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index 7bbe3038c58..4e5e708e664 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; @@ -21,8 +22,8 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.TestPersistentTasksPlugin.TestRequest; import java.io.IOException; @@ -149,10 +150,10 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { public void testTaskCancellation() { ClusterService clusterService = createClusterService(); AtomicLong capturedTaskId = new AtomicLong(); - AtomicReference capturedListener = new AtomicReference<>(); + AtomicReference> capturedListener = new AtomicReference<>(); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { @Override - public void sendCancellation(long taskId, PersistentTaskOperationListener listener) { + public void sendCancellation(long taskId, ActionListener listener) { capturedTaskId.set(taskId); capturedListener.set(listener); } @@ -203,7 +204,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // That should trigger cancellation request assertThat(capturedTaskId.get(), equalTo(localId)); // Notify successful cancellation - capturedListener.get().onResponse(localId); + capturedListener.get().onResponse(new CancelTasksResponse()); // finish or fail task if (randomBoolean()) { @@ -226,11 +227,12 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { ClusterService clusterService = createClusterService(); AtomicLong capturedTaskId = new AtomicLong(-1L); AtomicReference capturedException = new AtomicReference<>(); - AtomicReference capturedListener = new AtomicReference<>(); + AtomicReference>> capturedListener = new AtomicReference<>(); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, null, null) { @Override - public void sendCompletionNotification(long taskId, Exception failure, PersistentTaskOperationListener listener) { + public void sendCompletionNotification(long taskId, Exception failure, + ActionListener> listener) { capturedTaskId.set(taskId); capturedException.set(failure); capturedListener.set(listener); @@ -283,7 +285,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { long id = taskManager.getTasks().values().iterator().next().getParentTaskId().getId(); // This time acknowledge notification - capturedListener.get().onResponse(id); + capturedListener.get().onResponse( + new PersistentTask<>(1, TestPersistentTasksPlugin.TestPersistentTasksExecutor.NAME, new TestRequest(), null)); // Reallocate failed task to another node state = newClusterState; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java index a2b615a4d48..af58c1defee 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentTasksPlugin.java @@ -49,7 +49,7 @@ import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment; -import org.elasticsearch.xpack.persistent.PersistentTasksService.PersistentTaskOperationListener; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.security.InternalClient; import java.io.IOException; @@ -80,7 +80,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { return Arrays.asList( new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class), new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), - new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class) @@ -93,8 +92,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { NamedXContentRegistry xContentRegistry) { InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient); - TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, persistentTasksService, - clusterService); + TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService); PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(testPersistentAction)); return Arrays.asList( @@ -306,9 +304,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { public static final String NAME = "cluster:admin/persistent/test"; private final ClusterService clusterService; - public TestPersistentTasksExecutor(Settings settings, PersistentTasksService persistentTasksService, - ClusterService clusterService) { - super(settings, NAME, persistentTasksService, ThreadPool.Names.GENERIC); + public TestPersistentTasksExecutor(Settings settings, ClusterService clusterService) { + super(settings, NAME, ThreadPool.Names.GENERIC); this.clusterService = clusterService; } @@ -354,9 +351,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin { CountDownLatch latch = new CountDownLatch(1); Status status = new Status("phase " + phase.incrementAndGet()); logger.info("updating the task status to {}", status); - task.updatePersistentStatus(status, new PersistentTaskOperationListener() { + task.updatePersistentStatus(status, new ActionListener>() { @Override - public void onResponse(long taskId) { + public void onResponse(PersistentTask persistentTask) { logger.info("updating was successful"); latch.countDown(); } diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index 734e6dcc874..5cae99a91f7 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -139,7 +139,6 @@ indices:internal/data/write/mldeletebyquery cluster:internal/xpack/ml/job/update/process cluster:admin/xpack/ml/delete_expired_data cluster:admin/persistent/create -cluster:admin/persistent/start cluster:admin/persistent/completion cluster:admin/persistent/update_status cluster:admin/persistent/remove