From d733aa684deeafa942888a33d1ae34bcb5f3360d Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 10 Feb 2017 19:58:13 -0500 Subject: [PATCH] Make persistent task persist full cluster restart This commit moves persistent tasks from ClusterState.Custom to MetaData.Custom and adds ability for the task to remain in the metadata after completion. Original commit: elastic/x-pack-elasticsearch@751b40de2af8c34bc4824b0a278dde8e7355f30a --- .../xpack/ml/MachineLearning.java | 29 +- .../xpack/ml/action/CloseJobAction.java | 8 +- .../xpack/ml/action/DeleteDatafeedAction.java | 3 +- .../ml/action/GetDatafeedsStatsAction.java | 2 +- .../xpack/ml/action/GetJobsStatsAction.java | 4 +- .../xpack/ml/action/OpenJobAction.java | 4 +- .../xpack/ml/action/StartDatafeedAction.java | 4 +- .../xpack/ml/action/StopDatafeedAction.java | 2 +- .../xpack/ml/action/UpdateDatafeedAction.java | 3 +- .../xpack/ml/datafeed/DatafeedState.java | 9 + .../xpack/ml/job/JobManager.java | 6 +- .../xpack/ml/job/config/JobState.java | 9 + .../xpack/ml/utils/DatafeedStateObserver.java | 2 +- .../xpack/ml/utils/JobStateObserver.java | 2 +- .../CreatePersistentTaskAction.java | 245 +++++++++++ .../PersistentActionCoordinator.java | 17 +- .../persistent/PersistentActionService.java | 4 +- .../PersistentTaskClusterService.java | 280 ++++++++----- .../persistent/PersistentTasksInProgress.java | 390 ++++++++++++++++- .../RemovePersistentTaskAction.java | 2 +- .../persistent/StartPersistentTaskAction.java | 100 +++-- .../persistent/TransportPersistentAction.java | 2 +- .../xpack/ml/action/CloseJobActionTests.java | 22 +- .../xpack/ml/action/OpenJobActionTests.java | 29 +- .../ml/action/StartDatafeedActionTests.java | 20 +- .../integration/BasicDistributedJobsIT.java | 9 +- .../xpack/ml/integration/TooManyJobsIT.java | 2 +- .../ml/job/metadata/MlMetadataTests.java | 6 +- ...ersistentActionCoordinatorStatusTests.java | 30 ++ .../PersistentActionCoordinatorTests.java | 55 +-- .../PersistentActionFullRestartIT.java | 127 ++++++ .../xpack/persistent/PersistentActionIT.java | 106 +++-- .../PersistentTaskClusterServiceTests.java | 391 ++++++++++++++++++ .../PersistentTasksInProgressTests.java | 241 ++++++++++- .../StartPersistentActionRequestTests.java | 2 +- .../TestPersistentActionPlugin.java | 55 ++- .../UpdatePersistentTaskRequestTests.java | 34 ++ .../org/elasticsearch/transport/actions | 1 + 38 files changed, 1940 insertions(+), 317 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorStatusTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 3ee92934794..0fc8b60a5bf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; @@ -129,6 +128,7 @@ import org.elasticsearch.xpack.persistent.PersistentActionService; import org.elasticsearch.xpack.persistent.PersistentTaskClusterService; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress; import org.elasticsearch.xpack.persistent.RemovePersistentTaskAction; +import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction; import org.elasticsearch.xpack.persistent.StartPersistentTaskAction; import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; @@ -212,27 +212,31 @@ public class MachineLearning extends Plugin implements ActionPlugin { @Override public List getNamedWriteables() { return Arrays.asList( + // Custom metadata new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new), new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new), - new NamedWriteableRegistry.Entry(Task.Status.class, PersistentActionCoordinator.Status.NAME, - PersistentActionCoordinator.Status::new), - new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), + new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + + // Persistent action requests new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new), new NamedWriteableRegistry.Entry(PersistentActionRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new), + + // Task statuses + new NamedWriteableRegistry.Entry(Task.Status.class, PersistentActionCoordinator.Status.NAME, + PersistentActionCoordinator.Status::new), new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream), new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream) - ); + ); } @Override public List getNamedXContent() { - NamedXContentRegistry.Entry entry = new NamedXContentRegistry.Entry( - MetaData.Custom.class, - new ParseField("ml"), - parser -> MlMetadata.ML_METADATA_PARSER.parse(parser, null).build() + return Arrays.asList(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"), + parser -> MlMetadata.ML_METADATA_PARSER.parse(parser, null).build()), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobState.NAME), JobState::fromXContent) ); - return Collections.singletonList(entry); } @Override @@ -285,7 +289,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { persistentActionService, persistentActionRegistry, new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService) - ); + ); } public Collection nodeModules() { @@ -373,13 +377,14 @@ public class MachineLearning extends Plugin implements ActionPlugin { new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), + new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class), new ActionHandler<>(MlDeleteByQueryAction.INSTANCE, MlDeleteByQueryAction.TransportAction.class), new ActionHandler<>(UpdateProcessAction.INSTANCE, UpdateProcessAction.TransportAction.class) - ); + ); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index e44fbe6654a..c36dc601a1b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -307,7 +307,7 @@ public class CloseJobAction extends Action> p = t -> { OpenJobAction.Request storedRequest = (OpenJobAction.Request) t.getRequest(); @@ -331,7 +331,7 @@ public class CloseJobAction extends Action task = validateAndFindTask(jobId, currentState); - PersistentTasksInProgress currentTasks = currentState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress currentTasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE); Map> updatedTasks = new HashMap<>(currentTasks.taskMap()); for (PersistentTaskInProgress taskInProgress : currentTasks.tasks()) { if (taskInProgress.getId() == task.getId()) { @@ -348,9 +348,9 @@ public class CloseJobAction extends Action states = new HashMap<>(); - PersistentTasksInProgress tasksInProgress = state.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasksInProgress = state.getMetaData().custom(PersistentTasksInProgress.TYPE); if (tasksInProgress != null) { Predicate> predicate = ALL.equals(request.getDatafeedId()) ? p -> true : p -> request.getDatafeedId().equals(((StartDatafeedAction.Request) p.getRequest()).getDatafeedId()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java index ce586246934..df2068f1d47 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/GetJobsStatsAction.java @@ -348,7 +348,7 @@ public class GetJobsStatsAction extends Action> listener) { String jobId = task.getJobId(); logger.debug("Get stats for job '{}'", jobId); - PersistentTasksInProgress tasks = clusterService.state().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE); Optional> stats = processManager.getStatistics(jobId); if (stats.isPresent()) { JobState jobState = MlMetadata.getJobState(jobId, tasks); @@ -370,7 +370,7 @@ public class GetJobsStatsAction extends Action jobStats = new AtomicArray<>(jobIds.size()); - PersistentTasksInProgress tasks = clusterService.state().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = clusterService.state().getMetaData().custom(PersistentTasksInProgress.TYPE); for (int i = 0; i < jobIds.size(); i++) { int slot = i; String jobId = jobIds.get(i); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index fae7c10bafb..ccaefec33e7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -297,7 +297,7 @@ public class OpenJobAction extends Action nodeAttributes = node.getAttributes(); String allocationEnabled = nodeAttributes.get(MachineLearning.ALLOCATION_ENABLED_ATTR); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index 684fc9ce8e1..6e40c908153 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -315,7 +315,7 @@ public class StartDatafeedAction @Override public void validate(Request request, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); - PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); DiscoveryNodes nodes = clusterState.getNodes(); StartDatafeedAction.validate(request.getDatafeedId(), mlMetadata, tasks, nodes); } @@ -373,7 +373,7 @@ public class StartDatafeedAction static DiscoveryNode selectNode(Logger logger, Request request, ClusterState clusterState) { MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE); - PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); DiscoveryNodes nodes = clusterState.getNodes(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 48a36f2cb6f..78e8516a7a2 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -146,7 +146,7 @@ public class StopDatafeedAction MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE); validate(datafeedId, mlMetadata); - PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE); PersistentTaskInProgress task = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); if (task != null) { RemovePersistentTaskAction.Request removeTaskRequest = new RemovePersistentTaskAction.Request(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java index b6a898bdaa7..08ae06bf01e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateDatafeedAction.java @@ -161,7 +161,8 @@ public class UpdateDatafeedAction extends Action predicate = (newState) -> { - PersistentTasksInProgress tasks = newState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = newState.getMetaData().custom(PersistentTasksInProgress.TYPE); DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeedId, tasks); return datafeedState == expectedState; }; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java index 253dfa8ba14..7042dd11059 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java @@ -87,7 +87,7 @@ public class JobStateObserver { @Override public boolean test(ClusterState newState) { - PersistentTasksInProgress tasks = newState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = newState.getMetaData().custom(PersistentTasksInProgress.TYPE); JobState jobState = MlMetadata.getJobState(jobId, tasks); if (jobState == JobState.FAILED) { failed = true; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java new file mode 100644 index 00000000000..1970c9c059e --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/CreatePersistentTaskAction.java @@ -0,0 +1,245 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.persistent; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * This action can be used to add the record for the persistent action to the cluster state. + */ +public class CreatePersistentTaskAction extends Action { + + public static final CreatePersistentTaskAction INSTANCE = new CreatePersistentTaskAction(); + public static final String NAME = "cluster:admin/persistent/create"; + + private CreatePersistentTaskAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public PersistentActionResponse newResponse() { + return new PersistentActionResponse(); + } + + public static class Request extends MasterNodeRequest { + + private String action; + + private PersistentActionRequest request; + + private boolean stopped; + + private boolean removeOnCompletion = true; + + public Request() { + + } + + public Request(String action, PersistentActionRequest request) { + this.action = action; + this.request = request; + this.stopped = false; + this.removeOnCompletion = true; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + action = in.readString(); + request = in.readNamedWriteable(PersistentActionRequest.class); + stopped = in.readBoolean(); + removeOnCompletion = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(action); + out.writeNamedWriteable(request); + out.writeBoolean(stopped); + out.writeBoolean(removeOnCompletion); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (this.action == null) { + validationException = addValidationError("action must be specified", validationException); + } + if (this.request == null) { + validationException = addValidationError("request must be specified", validationException); + } + return validationException; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request1 = (Request) o; + return Objects.equals(action, request1.action) && + Objects.equals(request, request1.request) && + removeOnCompletion == request1.removeOnCompletion && + stopped == request1.stopped; + } + + @Override + public int hashCode() { + return Objects.hash(action, request, removeOnCompletion, stopped); + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public PersistentActionRequest getRequest() { + return request; + } + + public void setRequest(PersistentActionRequest request) { + this.request = request; + } + + public boolean isStopped() { + return stopped; + } + + public void setStopped(boolean stopped) { + this.stopped = stopped; + } + + public boolean shouldRemoveOnCompletion() { + return removeOnCompletion; + } + + public void setRemoveOnCompletion(boolean removeOnCompletion) { + this.removeOnCompletion = removeOnCompletion; + } + } + + public static class RequestBuilder extends MasterNodeOperationRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction action) { + super(client, action, new Request()); + } + + public RequestBuilder setAction(String action) { + request.setAction(action); + return this; + } + + public RequestBuilder setRequest(PersistentActionRequest persistentActionRequest) { + request.setRequest(persistentActionRequest); + return this; + } + + /** + * Indicates if the persistent task should be created in the stopped state. Defaults to false. + */ + public RequestBuilder setStopped(boolean stopped) { + request.setStopped(stopped); + return this; + } + + /** + * Indicates if the persistent task record should be removed upon the first successful completion of the task. Defaults to true. + */ + public RequestBuilder setRemoveOnCompletion(boolean removeOnCompletion) { + request.setRemoveOnCompletion(removeOnCompletion); + return this; + } + } + + public static class TransportAction extends TransportMasterNodeAction { + + private final PersistentTaskClusterService persistentTaskClusterService; + + @Inject + public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + PersistentTaskClusterService persistentTaskClusterService, + PersistentActionRegistry persistentActionRegistry, + PersistentActionService persistentActionService, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, CreatePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + this.persistentTaskClusterService = persistentTaskClusterService; + PersistentActionExecutor executor = new PersistentActionExecutor(threadPool); + clusterService.addListener(new PersistentActionCoordinator(settings, persistentActionService, persistentActionRegistry, + transportService.getTaskManager(), executor)); + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected PersistentActionResponse newResponse() { + return new PersistentActionResponse(); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + // Cluster is not affected but we look up repositories in metadata + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected final void masterOperation(final Request request, ClusterState state, + final ActionListener listener) { + persistentTaskClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion, + new ActionListener() { + @Override + public void onResponse(Long newTaskId) { + listener.onResponse(new PersistentActionResponse(newTaskId)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + } +} + + diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java index 408c4f9e595..c2e1fd6e937 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinator.java @@ -62,8 +62,8 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl @Override public void clusterChanged(ClusterChangedEvent event) { - PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE); - PersistentTasksInProgress previousTasks = event.previousState().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress previousTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE); if (Objects.equals(tasks, previousTasks) == false || event.nodesChanged()) { // We have some changes let's check if they are related to our node @@ -389,6 +389,19 @@ public class PersistentActionCoordinator extends AbstractComponent implements Cl public boolean isFragment() { return false; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return state == status.state; + } + + @Override + public int hashCode() { + return Objects.hash(state); + } } } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionService.java index 1f02f892640..9686ed9fd37 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentActionService.java @@ -36,9 +36,9 @@ public class PersistentActionService extends AbstractComponent { public void sendRequest(String action, Request request, ActionListener listener) { - StartPersistentTaskAction.Request startRequest = new StartPersistentTaskAction.Request(action, request); + CreatePersistentTaskAction.Request startRequest = new CreatePersistentTaskAction.Request(action, request); try { - client.execute(StartPersistentTaskAction.INSTANCE, startRequest, listener); + client.execute(CreatePersistentTaskAction.INSTANCE, startRequest, listener); } catch (Exception e) { listener.onFailure(e); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java index 762aa8e03bb..465695b9c4d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterService.java @@ -5,11 +5,14 @@ */ package org.elasticsearch.xpack.persistent; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -19,9 +22,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -49,20 +49,19 @@ public class PersistentTaskClusterService extends AbstractComponent implements C * @param request request * @param listener the listener that will be called when task is started */ - public void createPersistentTask(String action, Request request, + public void createPersistentTask(String action, Request request, boolean stopped, + boolean removeOnCompletion, ActionListener listener) { clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - final String executorNodeId = executorNode(action, currentState, request); - PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); - long nextId; - if (tasksInProgress != null) { - nextId = tasksInProgress.getCurrentId() + 1; + final String executorNodeId; + if (stopped) { + executorNodeId = null; // the task is stopped no need to assign it anywhere yet } else { - nextId = 1; + executorNodeId = executorNode(action, currentState, request); } - return createPersistentTask(currentState, new PersistentTaskInProgress<>(nextId, action, request, executorNodeId)); + return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, executorNodeId)); } @Override @@ -72,7 +71,8 @@ public class PersistentTaskClusterService extends AbstractComponent implements C @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(((PersistentTasksInProgress) newState.custom(PersistentTasksInProgress.TYPE)).getCurrentId()); + listener.onResponse( + ((PersistentTasksInProgress) newState.getMetaData().custom(PersistentTasksInProgress.TYPE)).getCurrentId()); } }); } @@ -96,23 +96,81 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); - if (tasksInProgress == null) { - // Nothing to do, the task was already deleted - return currentState; - } - if (failure != null) { - // If the task failed - we need to restart it on another node, otherwise we just remove it - PersistentTaskInProgress taskInProgress = tasksInProgress.getTask(id); - if (taskInProgress != null) { - String executorNode = executorNode(taskInProgress.getAction(), currentState, taskInProgress.getRequest()); - return updatePersistentTask(currentState, new PersistentTaskInProgress<>(taskInProgress, executorNode)); + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + if (failure != null) { + // If the task failed - we need to restart it on another node, otherwise we just remove it + tasksInProgress.reassignTask(id, (action, request) -> executorNode(action, currentState, request)); + } else { + tasksInProgress.finishTask(id); } - return currentState; + return update(currentState, tasksInProgress); } else { - return removePersistentTask(currentState, id); + // we don't send the error message back to the caller becase that would cause an infinite loop of notifications + logger.warn("The task {} wasn't found, status is not updated", id); + return currentState; } + } + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(Empty.INSTANCE); + } + }); + } + + /** + * Switches the persistent task from stopped to started mode + * + * @param id the id of a persistent task + * @param listener the listener that will be called when task is removed + */ + public void startPersistentTask(long id, ActionListener listener) { + clusterService.submitStateUpdateTask("start persistent task", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + return update(currentState, tasksInProgress + .assignTask(id, (action, request) -> executorNode(action, currentState, request))); + } else { + throw new ResourceNotFoundException("the task with id {} doesn't exist", id); + } + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(Empty.INSTANCE); + } + }); + } + + /** + * Removes the persistent task + * + * @param id the id of a persistent task + * @param listener the listener that will be called when task is removed + */ + public void removePersistentTask(long id, ActionListener listener) { + clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + return update(currentState, tasksInProgress.removeTask(id)); + } else { + throw new ResourceNotFoundException("the task with id {} doesn't exist", id); + } } @Override @@ -138,16 +196,12 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress tasksInProgress = currentState.custom(PersistentTasksInProgress.TYPE); - if (tasksInProgress == null) { - // Nothing to do, the task no longer exists - return currentState; + PersistentTasksInProgress.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(id)) { + return update(currentState, tasksInProgress.updateTaskStatus(id, status)); + } else { + throw new ResourceNotFoundException("the task with id {} doesn't exist", id); } - PersistentTaskInProgress task = tasksInProgress.getTask(id); - if (task != null) { - return updatePersistentTask(currentState, new PersistentTaskInProgress<>(task, status)); - } - return currentState; } @Override @@ -162,44 +216,6 @@ public class PersistentTaskClusterService extends AbstractComponent implements C }); } - private ClusterState updatePersistentTask(ClusterState oldState, PersistentTaskInProgress newTask) { - PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); - Map> taskMap = new HashMap<>(); - taskMap.putAll(oldTasks.taskMap()); - taskMap.put(newTask.getId(), newTask); - ClusterState.Builder builder = ClusterState.builder(oldState); - PersistentTasksInProgress newTasks = new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap)); - return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build(); - } - - private ClusterState createPersistentTask(ClusterState oldState, PersistentTaskInProgress newTask) { - PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); - Map> taskMap = new HashMap<>(); - if (oldTasks != null) { - taskMap.putAll(oldTasks.taskMap()); - } - taskMap.put(newTask.getId(), newTask); - ClusterState.Builder builder = ClusterState.builder(oldState); - PersistentTasksInProgress newTasks = new PersistentTasksInProgress(newTask.getId(), Collections.unmodifiableMap(taskMap)); - return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build(); - } - - private ClusterState removePersistentTask(ClusterState oldState, long taskId) { - PersistentTasksInProgress oldTasks = oldState.custom(PersistentTasksInProgress.TYPE); - if (oldTasks != null) { - Map> taskMap = new HashMap<>(); - ClusterState.Builder builder = ClusterState.builder(oldState); - taskMap.putAll(oldTasks.taskMap()); - taskMap.remove(taskId); - PersistentTasksInProgress newTasks = - new PersistentTasksInProgress(oldTasks.getCurrentId(), Collections.unmodifiableMap(taskMap)); - return builder.putCustom(PersistentTasksInProgress.TYPE, newTasks).build(); - } else { - // no tasks - nothing to do - return oldState; - } - } - private String executorNode(String action, ClusterState currentState, Request request) { TransportPersistentAction persistentAction = registry.getPersistentActionSafe(action); persistentAction.validate(request, currentState); @@ -218,30 +234,48 @@ public class PersistentTaskClusterService extends AbstractComponent implements C @Override public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster()) { - PersistentTasksInProgress tasks = event.state().custom(PersistentTasksInProgress.TYPE); - if (tasks != null && (event.nodesChanged() || event.previousState().nodes().isLocalNodeElectedMaster() == false)) { - // We need to check if removed nodes were running any of the tasks and reassign them - boolean reassignmentRequired = false; - Set removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - for (PersistentTaskInProgress taskInProgress : tasks.tasks()) { - if (taskInProgress.getExecutorNode() == null) { - // there is an unassigned task - we need to try assigning it - reassignmentRequired = true; - break; - } - if (removedNodes.contains(taskInProgress.getExecutorNode())) { - // The caller node disappeared, we need to assign a new caller node - reassignmentRequired = true; - break; - } - } - if (reassignmentRequired) { - reassignTasks(); - } + logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); + if (reassignmentRequired(event, this::executorNode)) { + logger.trace("task reassignment is needed"); + reassignTasks(); + } else { + logger.trace("task reassignment is not needed"); } } } + interface ExecutorNodeDecider { + String executorNode(String action, ClusterState currentState, Request request); + } + + static boolean reassignmentRequired(ClusterChangedEvent event, ExecutorNodeDecider decider) { + PersistentTasksInProgress tasks = event.state().getMetaData().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress prevTasks = event.previousState().getMetaData().custom(PersistentTasksInProgress.TYPE); + if (tasks != null && (Objects.equals(tasks, prevTasks) == false || + event.nodesChanged() || + event.routingTableChanged() || + event.previousState().nodes().isLocalNodeElectedMaster() == false)) { + // We need to check if removed nodes were running any of the tasks and reassign them + boolean reassignmentRequired = false; + Set removedNodes = event.nodesDelta().removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + for (PersistentTaskInProgress taskInProgress : tasks.tasks()) { + if (taskInProgress.isStopped() == false) { // skipping stopped tasks + if (taskInProgress.getExecutorNode() == null || removedNodes.contains(taskInProgress.getExecutorNode())) { + // there is an unassigned task or task with a disappeared node - we need to try assigning it + if (Objects.equals(taskInProgress.getRequest(), + decider.executorNode(taskInProgress.getAction(), event.state(), taskInProgress.getRequest())) == false) { + // it looks like a assignment for at least one task is possible - let's trigger reassignment + reassignmentRequired = true; + break; + } + } + } + } + return reassignmentRequired; + } + return false; + } + /** * Evaluates the cluster state and tries to assign tasks to nodes */ @@ -249,22 +283,7 @@ public class PersistentTaskClusterService extends AbstractComponent implements C clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { - PersistentTasksInProgress tasks = currentState.custom(PersistentTasksInProgress.TYPE); - ClusterState newClusterState = currentState; - DiscoveryNodes nodes = currentState.nodes(); - if (tasks != null) { - // We need to check if removed nodes were running any of the tasks and reassign them - for (PersistentTaskInProgress task : tasks.tasks()) { - if (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false) { - // there is an unassigned task - we need to try assigning it - String executorNode = executorNode(task.getAction(), currentState, task.getRequest()); - if (Objects.equals(executorNode, task.getExecutorNode()) == false) { - newClusterState = updatePersistentTask(newClusterState, new PersistentTaskInProgress<>(task, executorNode)); - } - } - } - } - return newClusterState; + return reassignTasks(currentState, logger, PersistentTaskClusterService.this::executorNode); } @Override @@ -278,4 +297,49 @@ public class PersistentTaskClusterService extends AbstractComponent implements C } }); } + + static ClusterState reassignTasks(ClusterState currentState, Logger logger, ExecutorNodeDecider decider) { + PersistentTasksInProgress tasks = currentState.getMetaData().custom(PersistentTasksInProgress.TYPE); + ClusterState clusterState = currentState; + DiscoveryNodes nodes = currentState.nodes(); + if (tasks != null) { + logger.trace("reassigning {} persistent tasks", tasks.tasks().size()); + // We need to check if removed nodes were running any of the tasks and reassign them + for (PersistentTaskInProgress task : tasks.tasks()) { + if (task.isStopped() == false && + (task.getExecutorNode() == null || nodes.nodeExists(task.getExecutorNode()) == false)) { + // there is an unassigned task - we need to try assigning it + String executorNode = decider.executorNode(task.getAction(), clusterState, task.getRequest()); + if (Objects.equals(executorNode, task.getExecutorNode()) == false) { + logger.trace("reassigning task {} from node {} to node {}", task.getId(), + task.getExecutorNode(), executorNode); + clusterState = update(clusterState, builder(clusterState).reassignTask(task.getId(), executorNode)); + } else { + logger.trace("ignoring task {} because executor nodes are the same {}", task.getId(), executorNode); + } + } else { + if (task.isStopped()) { + logger.trace("ignoring task {} because it is stopped", task.getId()); + } else { + logger.trace("ignoring task {} because it is still running", task.getId()); + } + } + } + } + return clusterState; + } + + private static PersistentTasksInProgress.Builder builder(ClusterState currentState) { + return PersistentTasksInProgress.builder(currentState.getMetaData().custom(PersistentTasksInProgress.TYPE)); + } + + private static ClusterState update(ClusterState currentState, PersistentTasksInProgress.Builder tasksInProgress) { + if (tasksInProgress.isChanged()) { + return ClusterState.builder(currentState).metaData( + MetaData.builder(currentState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasksInProgress.build()) + ).build(); + } else { + return currentState; + } + } } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java index 2f06c6530b3..436206bdf9c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgress.java @@ -7,30 +7,44 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser.NamedObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task.Status; import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.elasticsearch.cluster.metadata.MetaData.ALL_CONTEXTS; + /** * A cluster state record that contains a list of all running persistent tasks */ -public final class PersistentTasksInProgress extends AbstractNamedDiffable implements ClusterState.Custom { +public final class PersistentTasksInProgress extends AbstractNamedDiffable implements MetaData.Custom { public static final String TYPE = "persistent_tasks"; + private static final String API_CONTEXT = MetaData.XContentContext.API.toString(); + // TODO: Implement custom Diff for tasks private final Map> tasks; @@ -41,6 +55,69 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable PERSISTENT_TASKS_IN_PROGRESS_PARSER = new ObjectParser<>(TYPE, + Builder::new); + + public static final ObjectParser, Void> PERSISTENT_TASK_IN_PROGRESS_PARSER = + new ObjectParser<>("running_tasks", TaskBuilder::new); + + public static final NamedObjectParser, Void> ACTION_PARSER; + + static { + // Tasks parser initialization + PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareLong(Builder::setCurrentId, new ParseField("current_id")); + PERSISTENT_TASKS_IN_PROGRESS_PARSER.declareObjectArray(Builder::setTasks, PERSISTENT_TASK_IN_PROGRESS_PARSER, + new ParseField("running_tasks")); + + // Action parser initialization + ObjectParser, String> parser = new ObjectParser<>("named"); + parser.declareObject(ActionDescriptionBuilder::setRequest, + (p, c) -> p.namedObject(PersistentActionRequest.class, c, null), new ParseField("request")); + parser.declareObject(ActionDescriptionBuilder::setStatus, + (p, c) -> p.namedObject(Status.class, c, null), new ParseField("status")); + ACTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new ActionDescriptionBuilder<>(name), name); + + // Task parser initialization + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setId, new ParseField("id")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareNamedObjects( + (TaskBuilder taskBuilder, List> objects) -> { + if (objects.size() != 1) { + throw new IllegalArgumentException("only one action description per task is allowed"); + } + ActionDescriptionBuilder builder = objects.get(0); + taskBuilder.setAction(builder.action); + taskBuilder.setRequest(builder.request); + taskBuilder.setStatus(builder.status); + }, ACTION_PARSER, new ParseField("action")); + PERSISTENT_TASK_IN_PROGRESS_PARSER.declareStringOrNull(TaskBuilder::setExecutorNode, new ParseField("executor_node")); + } + + /** + * Private builder used in XContent parser + */ + private static class ActionDescriptionBuilder { + private final String action; + private Request request; + private Status status; + + private ActionDescriptionBuilder(String action) { + this.action = action; + } + + private ActionDescriptionBuilder setRequest(Request request) { + this.request = request; + return this; + } + + private ActionDescriptionBuilder setStatus(Status status) { + this.status = status; + return this; + } + } + public Collection> tasks() { return this.tasks.values(); } @@ -80,6 +157,11 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable action.equals(task.action) && nodeId.equals(task.executorNode)).count(); } @@ -89,6 +171,15 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable context() { + return ALL_CONTEXTS; + } + + public static PersistentTasksInProgress fromXContent(XContentParser parser) throws IOException { + return PERSISTENT_TASKS_IN_PROGRESS_PARSER.parse(parser, null).build(); + } + /** * A record that represents a single running persistent task */ @@ -97,32 +188,37 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable persistentTaskInProgress, String newExecutorNode) { - this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId + 1L, - persistentTaskInProgress.action, persistentTaskInProgress.request, persistentTaskInProgress.status, newExecutorNode); + public PersistentTaskInProgress(PersistentTaskInProgress task, boolean stopped, String newExecutorNode) { + this(task.id, task.allocationId + 1L, task.action, task.request, stopped, task.removeOnCompletion, task.status, + newExecutorNode); } - public PersistentTaskInProgress(PersistentTaskInProgress persistentTaskInProgress, Status status) { - this(persistentTaskInProgress.id, persistentTaskInProgress.allocationId, - persistentTaskInProgress.action, persistentTaskInProgress.request, status, persistentTaskInProgress.executorNode); + public PersistentTaskInProgress(PersistentTaskInProgress task, Status status) { + this(task.id, task.allocationId, task.action, task.request, task.stopped, task.removeOnCompletion, status, task.executorNode); } - private PersistentTaskInProgress(long id, long allocationId, String action, Request request, Status status, String executorNode) { + private PersistentTaskInProgress(long id, long allocationId, String action, Request request, + boolean stopped, boolean removeOnCompletion, Status status, String executorNode) { this.id = id; this.allocationId = allocationId; this.action = action; this.request = request; this.status = status; + this.stopped = stopped; + this.removeOnCompletion = removeOnCompletion; this.executorNode = executorNode; // Update parent request for starting tasks with correct parent task ID request.setParentTask("cluster", id); @@ -134,6 +230,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable { + private long id; + private long allocationId; + private String action; + private Request request; + private boolean stopped = true; + private boolean removeOnCompletion; + private Status status; + private String executorNode; + + public TaskBuilder setId(long id) { + this.id = id; + return this; + } + + public TaskBuilder setAllocationId(long allocationId) { + this.allocationId = allocationId; + return this; + } + + public TaskBuilder setAction(String action) { + this.action = action; + return this; + } + + public TaskBuilder setRequest(Request request) { + this.request = request; + return this; + } + + public TaskBuilder setStatus(Status status) { + this.status = status; + return this; + } + + + public TaskBuilder setStopped(boolean stopped) { + this.stopped = stopped; + return this; + } + + public TaskBuilder setRemoveOnCompletion(boolean removeOnCompletion) { + this.removeOnCompletion = removeOnCompletion; + return this; + } + + public TaskBuilder setExecutorNode(String executorNode) { + this.executorNode = executorNode; + return this; + } + + public PersistentTaskInProgress build() { + return new PersistentTaskInProgress<>(id, allocationId, action, request, stopped, removeOnCompletion, status, executorNode); + } + } + @Override public String getWriteableName() { return TYPE; @@ -233,8 +417,8 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable readDiffFrom(StreamInput in) throws IOException { - return readDiffFrom(ClusterState.Custom.class, TYPE, in); + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(MetaData.Custom.class, TYPE, in); } public long getCurrentId() { @@ -253,4 +437,168 @@ public final class PersistentTasksInProgress extends AbstractNamedDiffable> tasks = new HashMap<>(); + private long currentId; + private boolean changed; + + public Builder() { + } + + public Builder(PersistentTasksInProgress tasksInProgress) { + if (tasksInProgress != null) { + tasks.putAll(tasksInProgress.tasks); + currentId = tasksInProgress.currentId; + } else { + currentId = 0; + } + } + + private Builder setCurrentId(long currentId) { + this.currentId = currentId; + return this; + } + + private Builder setTasks(List> tasks) { + for (TaskBuilder builder : tasks) { + PersistentTaskInProgress task = builder.build(); + this.tasks.put(task.getId(), task); + } + return this; + } + + /** + * Adds a new task to the builder + *

+ * After the task is added its id can be found by calling {{@link #getCurrentId()}} method. + */ + public Builder addTask(String action, Request request, boolean stopped, + boolean removeOnCompletion, String executorNode) { + changed = true; + currentId++; + tasks.put(currentId, new PersistentTaskInProgress<>(currentId, action, request, stopped, removeOnCompletion, + executorNode)); + return this; + } + + /** + * Reassigns the task to another node if the task exist + */ + public Builder reassignTask(long taskId, String executorNode) { + PersistentTaskInProgress taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + } + return this; + } + + /** + * Assigns the task to another node if the task exist and not currently assigned + *

+ * The operation is only performed if the task is not currently assigned to any nodes. To force assignment use + * {@link #reassignTask(long, BiFunction)} instead + */ + @SuppressWarnings("unchecked") + public Builder assignTask(long taskId, + BiFunction executorNodeFunc) { + PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); + if (taskInProgress != null && taskInProgress.getExecutorNode() == null) { // only assign unassigned tasks + String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); + if (executorNode != null || taskInProgress.isStopped()) { + changed = true; + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + } + } + return this; + } + + /** + * Reassigns the task to another node if the task exist + */ + @SuppressWarnings("unchecked") + public Builder reassignTask(long taskId, + BiFunction executorNodeFunc) { + PersistentTaskInProgress taskInProgress = (PersistentTaskInProgress) tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + String executorNode = executorNodeFunc.apply(taskInProgress.action, taskInProgress.request); + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, false, executorNode)); + } + return this; + } + + /** + * Updates the task status if the task exist + */ + public Builder updateTaskStatus(long taskId, Status status) { + PersistentTaskInProgress taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, status)); + } + return this; + } + + /** + * Removes the task if the task exist + */ + public Builder removeTask(long taskId) { + if (tasks.remove(taskId) != null) { + changed = true; + } + return this; + } + + /** + * Finishes the task if the task exist. + * + * If the task is marked with removeOnCompletion flag, it is removed from the list, otherwise it is stopped. + */ + public Builder finishTask(long taskId) { + PersistentTaskInProgress taskInProgress = tasks.get(taskId); + if (taskInProgress != null) { + changed = true; + if (taskInProgress.removeOnCompletion) { + tasks.remove(taskId); + } else { + tasks.put(taskId, new PersistentTaskInProgress<>(taskInProgress, true, null)); + } + } + return this; + } + + /** + * Checks if the task is currently present in the list + */ + public boolean hasTask(long taskId) { + return tasks.containsKey(taskId); + } + + /** + * Returns the id of the last added task + */ + public long getCurrentId() { + return currentId; + } + + /** + * Returns true if any the task list was changed since the builder was created + */ + public boolean isChanged() { + return changed; + } + + public PersistentTasksInProgress build() { + return new PersistentTasksInProgress(currentId, Collections.unmodifiableMap(tasks)); + } + } } \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java index 77af43c6c99..180defc03b0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/RemovePersistentTaskAction.java @@ -178,7 +178,7 @@ public class RemovePersistentTaskAction extends Action listener) { - persistentTaskClusterService.completeOrRestartPersistentTask(request.taskId, null, new ActionListener() { + persistentTaskClusterService.removePersistentTask(request.taskId, new ActionListener() { @Override public void onResponse(Empty empty) { listener.onResponse(new Response(true)); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java index e0f137cc811..90a7b7023a9 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/StartPersistentTaskAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -23,16 +24,17 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Objects; /** - * Internal action used by TransportPersistentAction to add the record for the persistent action to the cluster state. + * This action can be used to start persistent action previously created using {@link CreatePersistentTaskAction} */ public class StartPersistentTaskAction extends Action { public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction(); @@ -48,37 +50,36 @@ public class StartPersistentTaskAction extends Action { - private String action; - - private PersistentActionRequest request; + private long taskId; public Request() { } - public Request(String action, PersistentActionRequest request) { - this.action = action; - this.request = request; + public Request(long taskId) { + this.taskId = taskId; + } + + public void setTaskId(long taskId) { + this.taskId = taskId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - action = in.readString(); - request = in.readOptionalNamedWriteable(PersistentActionRequest.class); + taskId = in.readLong(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(action); - out.writeOptionalNamedWriteable(request); + out.writeLong(taskId); } @Override @@ -90,26 +91,65 @@ public class StartPersistentTaskAction extends Action { + StartPersistentTaskAction.Response, StartPersistentTaskAction.RequestBuilder> { protected RequestBuilder(ElasticsearchClient client, StartPersistentTaskAction action) { super(client, action, new Request()); } + + public final RequestBuilder setTaskId(long taskId) { + request.setTaskId(taskId); + return this; + } + } - public static class TransportAction extends TransportMasterNodeAction { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTaskClusterService persistentTaskClusterService; @@ -117,25 +157,20 @@ public class StartPersistentTaskAction extends Action listener) { - persistentTaskClusterService.createPersistentTask(request.action, request.request, new ActionListener() { + protected final void masterOperation(final Request request, ClusterState state, final ActionListener listener) { + persistentTaskClusterService.startPersistentTask(request.taskId, new ActionListener() { @Override - public void onResponse(Long newTaskId) { - listener.onResponse(new PersistentActionResponse(newTaskId)); + public void onResponse(Empty empty) { + listener.onResponse(new Response(true)); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java index b065aa54cc9..af74346514d 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/TransportPersistentAction.java @@ -57,7 +57,7 @@ public abstract class TransportPersistentAction selector) { long minLoad = Long.MAX_VALUE; DiscoveryNode minLoadedNode = null; - PersistentTasksInProgress persistentTasksInProgress = clusterState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress persistentTasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); for (DiscoveryNode node : clusterState.getNodes()) { if (selector.test(node)) { if (persistentTasksInProgress == null) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java index f7ae2b84e90..42ca1ecebec 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/CloseJobActionTests.java @@ -26,15 +26,15 @@ public class CloseJobActionTests extends ESTestCase { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), null); + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null); task = new PersistentTaskInProgress<>(task, randomFrom(JobState.OPENED, JobState.FAILED)); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))); + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); ClusterState result = CloseJobAction.moveJobToClosingState("job_id", csBuilder.build()); - PersistentTasksInProgress actualTasks = result.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress actualTasks = result.getMetaData().custom(PersistentTasksInProgress.TYPE); assertEquals(JobState.CLOSING, actualTasks.getTask(1L).getStatus()); MlMetadata actualMetadata = result.metaData().custom(MlMetadata.TYPE); @@ -44,8 +44,8 @@ public class CloseJobActionTests extends ESTestCase { public void testMoveJobToClosingState_jobMissing() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap())); + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap()))); expectThrows(ResourceNotFoundException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder.build())); } @@ -53,19 +53,19 @@ public class CloseJobActionTests extends ESTestCase { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); mlBuilder.putJob(buildJobBuilder("job_id").build(), false); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), null); + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, null); task = new PersistentTaskInProgress<>(task, JobState.OPENING); ClusterState.Builder csBuilder1 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task))); + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)))); ElasticsearchStatusException result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder1.build())); assertEquals("cannot close job, expected job state [opened], but got [opening]", result.getMessage()); ClusterState.Builder csBuilder2 = ClusterState.builder(new ClusterName("_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build())) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap())); + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlBuilder.build()) + .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(1L, Collections.emptyMap()))); result = expectThrows(ElasticsearchStatusException.class, () -> CloseJobAction.moveJobToClosingState("job_id", csBuilder2.build())); assertEquals("cannot close job, expected job state [opened], but got [closed]", result.getMessage()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java index 49db4674dbf..f5dcf6f9f12 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/OpenJobActionTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.transport.TransportAddress; @@ -40,7 +41,7 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), "_node_id"); + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id"); task = new PersistentTaskInProgress<>(task, randomFrom(JobState.CLOSED, JobState.FAILED)); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); @@ -48,7 +49,7 @@ public class OpenJobActionTests extends ESTestCase { OpenJobAction.validate("job_id", mlBuilder.build(), new PersistentTasksInProgress(1L, Collections.emptyMap()), nodes); OpenJobAction.validate("job_id", mlBuilder.build(), null, nodes); - task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), "_other_node_id"); + task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id"); task = new PersistentTaskInProgress<>(task, JobState.OPENED); tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); OpenJobAction.validate("job_id", mlBuilder.build(), tasks, nodes); @@ -79,7 +80,7 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), "_node_id"); + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_node_id"); JobState jobState = randomFrom(JobState.OPENING, JobState.OPENED, JobState.CLOSING); task = new PersistentTaskInProgress<>(task, jobState); PersistentTasksInProgress tasks1 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); @@ -88,7 +89,7 @@ public class OpenJobActionTests extends ESTestCase { () -> OpenJobAction.validate("job_id", mlBuilder.build(), tasks1, nodes)); assertEquals("[job_id] expected state [closed] or [failed], but got [" + jobState +"]", e.getMessage()); - task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), "_other_node_id"); + task = new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), false, true, "_other_node_id"); jobState = randomFrom(JobState.OPENING, JobState.CLOSING); task = new PersistentTaskInProgress<>(task, jobState); PersistentTasksInProgress tasks2 = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); @@ -112,14 +113,17 @@ public class OpenJobActionTests extends ESTestCase { .build(); Map> taskMap = new HashMap<>(); - taskMap.put(0L, new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), "_node_id1")); - taskMap.put(1L, new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), "_node_id1")); - taskMap.put(2L, new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), "_node_id2")); + taskMap.put(0L, + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, "_node_id1")); + taskMap.put(1L, + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id2"), false, true, "_node_id1")); + taskMap.put(2L, + new PersistentTaskInProgress<>(2L, OpenJobAction.NAME, new OpenJobAction.Request("job_id3"), false, true, "_node_id2")); PersistentTasksInProgress tasks = new PersistentTasksInProgress(3L, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); - cs.putCustom(PersistentTasksInProgress.TYPE, tasks); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id4", cs.build(), logger); assertEquals("_node_id3", result.getId()); } @@ -139,14 +143,15 @@ public class OpenJobActionTests extends ESTestCase { nodes.add(new DiscoveryNode("_node_name" + i, nodeId, address, nodeAttr, Collections.emptySet(), Version.CURRENT)); for (int j = 0; j < maxRunningJobsPerNode; j++) { long id = j + (maxRunningJobsPerNode * i); - taskMap.put(id, new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request("job_id" + id), nodeId)); + taskMap.put(id, new PersistentTaskInProgress<>(id, OpenJobAction.NAME, new OpenJobAction.Request("job_id" + id), + false, true, nodeId)); } } PersistentTasksInProgress tasks = new PersistentTasksInProgress(numNodes * maxRunningJobsPerNode, taskMap); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); - cs.putCustom(PersistentTasksInProgress.TYPE, tasks); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); assertNull(result); } @@ -163,12 +168,12 @@ public class OpenJobActionTests extends ESTestCase { .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), "_node_id1"); + new PersistentTaskInProgress<>(1L, OpenJobAction.NAME, new OpenJobAction.Request("job_id1"), false, true, "_node_id1"); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(1L, task)); ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name")); cs.nodes(nodes); - cs.putCustom(PersistentTasksInProgress.TYPE, tasks); + cs.metaData(MetaData.builder().putCustom(PersistentTasksInProgress.TYPE, tasks)); DiscoveryNode result = OpenJobAction.selectLeastLoadedMlNode("job_id2", cs.build(), logger); assertNull(result); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java index 6562e770d99..0ada25abd2d 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StartDatafeedActionTests.java @@ -42,7 +42,7 @@ public class StartDatafeedActionTests extends ESTestCase { mlMetadata.putDatafeed(createDatafeed("datafeed_id", job.getId(), Collections.singletonList("*"))); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), "node_id"); + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request(job.getId()), false, true, "node_id"); task = new PersistentTaskInProgress<>(task, randomFrom(JobState.FAILED, JobState.CLOSED, JobState.CLOSING, JobState.OPENING)); PersistentTasksInProgress tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); @@ -53,8 +53,8 @@ public class StartDatafeedActionTests extends ESTestCase { .build(); ClusterState.Builder cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) - .putCustom(PersistentTasksInProgress.TYPE, tasks) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksInProgress.TYPE, tasks)) .nodes(nodes); StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed_id", 0L); @@ -64,8 +64,8 @@ public class StartDatafeedActionTests extends ESTestCase { task = new PersistentTaskInProgress<>(task, JobState.OPENED); tasks = new PersistentTasksInProgress(1L, Collections.singletonMap(0L, task)); cs = ClusterState.builder(new ClusterName("cluster_name")) - .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) - .putCustom(PersistentTasksInProgress.TYPE, tasks) + .metaData(new MetaData.Builder().putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksInProgress.TYPE, tasks)) .nodes(nodes); node = StartDatafeedAction.selectNode(logger, request, cs.build()); assertEquals("node_id", node.getId()); @@ -87,7 +87,7 @@ public class StartDatafeedActionTests extends ESTestCase { .putJob(job1, false) .build(); PersistentTaskInProgress task = - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), null); + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("foo"), false, true, null); PersistentTasksInProgress tasks = new PersistentTasksInProgress(0L, Collections.singletonMap(0L, task)); DatafeedConfig datafeedConfig1 = DatafeedJobRunnerTests.createDatafeedConfig("foo-datafeed", "foo").build(); MlMetadata mlMetadata2 = new MlMetadata.Builder(mlMetadata1) @@ -112,11 +112,11 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTaskInProgress jobTask = new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), - "node_id"); + false, true, "node_id"); jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - "node_id"); + false, true, "node_id"); datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); Map> taskMap = new HashMap<>(); taskMap.put(0L, jobTask); @@ -142,11 +142,11 @@ public class StartDatafeedActionTests extends ESTestCase { PersistentTaskInProgress jobTask = new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("job_id"), - "node_id2"); + false, true, "node_id2"); jobTask = new PersistentTaskInProgress<>(jobTask, JobState.OPENED); PersistentTaskInProgress datafeedTask = new PersistentTaskInProgress<>(0L, StartDatafeedAction.NAME, new StartDatafeedAction.Request("datafeed_id", 0L), - "node_id1"); + false, true, "node_id1"); datafeedTask = new PersistentTaskInProgress<>(datafeedTask, DatafeedState.STARTED); Map> taskMap = new HashMap<>(); taskMap.put(0L, jobTask); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index ed71da70bf6..d2cf6c3cfdc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; @@ -129,6 +130,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { cleanupWorkaround(2); } + @TestLogging("org.elasticsearch.xpack.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG") public void testDedicatedMlNode() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); // start 2 non ml node that will never get a job allocated. (but ml apis are accessable from this node) @@ -152,7 +154,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { client().execute(OpenJobAction.INSTANCE, openJobRequest).get(); assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); @@ -164,12 +166,13 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { }); // stop the only running ml node + logger.info("!!!!"); internalCluster().stopRandomNode(settings -> settings.getAsBoolean(MachineLearning.ALLOCATION_ENABLED.getKey(), true)); ensureStableCluster(2); assertBusy(() -> { // job should get and remain in a failed state: ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); assertNull(task.getExecutorNode()); @@ -183,7 +186,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase { assertBusy(() -> { // job should be re-opened: ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = clusterState.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); assertNotNull(task.getExecutorNode()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 2f5cdb2b5a5..073d1ee0904 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -50,7 +50,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase { client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request("2")).actionGet(); assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.CLOSED); ClusterState state = client().admin().cluster().prepareState().get().getState(); - PersistentTasksInProgress tasks = state.custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = state.getMetaData().custom(PersistentTasksInProgress.TYPE); assertEquals(1, tasks.taskMap().size()); // now just double check that the first job is still opened: PersistentTasksInProgress.PersistentTaskInProgress task = tasks.taskMap().values().iterator().next(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java index 51dcbfe3018..46271c52ce5 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/metadata/MlMetadataTests.java @@ -148,7 +148,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { PersistentTaskInProgress task = new PersistentTaskInProgress<>( - new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("1"), null), + new PersistentTaskInProgress<>(0L, OpenJobAction.NAME, new OpenJobAction.Request("1"), false, true, null), JobState.CLOSED ); MlMetadata.Builder builder2 = new MlMetadata.Builder(result); @@ -272,7 +272,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedConfig1.getId(), 0L); PersistentTaskInProgress taskInProgress = - new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, null); + new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, null); PersistentTasksInProgress tasksInProgress = new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); @@ -334,7 +334,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase { StartDatafeedAction.Request request = new StartDatafeedAction.Request("datafeed1", 0L); PersistentTaskInProgress taskInProgress = - new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, null); + new PersistentTaskInProgress<>(0, StartDatafeedAction.NAME, request, false, true, null); PersistentTasksInProgress tasksInProgress = new PersistentTasksInProgress(1, Collections.singletonMap(taskInProgress.getId(), taskInProgress)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorStatusTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorStatusTests.java new file mode 100644 index 00000000000..05d193f6eb1 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorStatusTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.persistent; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.persistent.PersistentActionCoordinator.State; +import org.elasticsearch.xpack.persistent.PersistentActionCoordinator.Status; + +import static org.hamcrest.Matchers.containsString; + +public class PersistentActionCoordinatorStatusTests extends AbstractWireSerializingTestCase { + + @Override + protected Status createTestInstance() { + return new Status(randomFrom(State.values())); + } + + @Override + protected Writeable.Reader instanceReader() { + return Status::new; + } + + public void testToString() { + assertThat(createTestInstance().toString(), containsString("state")); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java index 1aab10abad9..062f2306268 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionCoordinatorTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -22,14 +23,11 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.xpack.persistent.CompletionPersistentTaskAction.Response; -import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -73,18 +71,14 @@ public class PersistentActionCoordinatorTests extends ESTestCase { ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) .build(); - Map> tasks = new HashMap<>(); - long taskId = randomLong(); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(); boolean added = false; if (nonLocalNodesCount > 0) { for (int i = 0; i < randomInt(5); i++) { - tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test_action", new TestRequest("other_" + i), - "other_node_" + randomInt(nonLocalNodesCount))); - taskId++; + tasks.addTask("test_action", new TestRequest("other_" + i), false, true, "other_node_" + randomInt(nonLocalNodesCount)); if (added == false && randomBoolean()) { added = true; - tasks.put(taskId, new PersistentTaskInProgress<>(taskId, "test", new TestRequest("this_param"), "this_node")); - taskId++; + tasks.addTask("test", new TestRequest("this_param"), false, true, "this_node"); } } } @@ -93,8 +87,9 @@ public class PersistentActionCoordinatorTests extends ESTestCase { logger.info("No local node action was added"); } - ClusterState newClusterState = ClusterState.builder(state) - .putCustom(PersistentTasksInProgress.TYPE, new PersistentTasksInProgress(taskId, tasks)).build(); + MetaData.Builder metaData = MetaData.builder(state.metaData()); + metaData.putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state)); if (added) { @@ -290,34 +285,26 @@ public class PersistentActionCoordinatorTests extends ESTestCase { private ClusterState addTask(ClusterState state, String action, Request request, String node) { - PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); - Map> tasks = prevTasks == null ? new HashMap<>() : new HashMap<>(prevTasks.taskMap()); - long id = prevTasks == null ? 0 : prevTasks.getCurrentId(); - tasks.put(id, new PersistentTaskInProgress<>(id, action, request, node)); - return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, - new PersistentTasksInProgress(prevTasks == null ? 1 : prevTasks.getCurrentId() + 1, tasks)).build(); + PersistentTasksInProgress.Builder builder = + PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + builder.addTask(action, request, false, true, node).build())).build(); } private ClusterState reallocateTask(ClusterState state, long taskId, String node) { - PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); - assertNotNull(prevTasks); - Map> tasks = new HashMap<>(prevTasks.taskMap()); - PersistentTaskInProgress prevTask = tasks.get(taskId); - assertNotNull(prevTask); - tasks.put(prevTask.getId(), new PersistentTaskInProgress<>(prevTask, node)); - return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, - new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build(); + PersistentTasksInProgress.Builder builder = + PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + assertTrue(builder.hasTask(taskId)); + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + builder.reassignTask(taskId, node).build())).build(); } private ClusterState removeTask(ClusterState state, long taskId) { - PersistentTasksInProgress prevTasks = state.custom(PersistentTasksInProgress.TYPE); - assertNotNull(prevTasks); - Map> tasks = new HashMap<>(prevTasks.taskMap()); - PersistentTaskInProgress prevTask = tasks.get(taskId); - assertNotNull(prevTask); - tasks.remove(prevTask.getId()); - return ClusterState.builder(state).putCustom(PersistentTasksInProgress.TYPE, - new PersistentTasksInProgress(prevTasks.getCurrentId(), tasks)).build(); + PersistentTasksInProgress.Builder builder = + PersistentTasksInProgress.builder(state.getMetaData().custom(PersistentTasksInProgress.TYPE)); + assertTrue(builder.hasTask(taskId)); + return ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).putCustom(PersistentTasksInProgress.TYPE, + builder.removeTask(taskId).build())).build(); } private class Execution { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java new file mode 100644 index 00000000000..cff1cb0a77d --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionFullRestartIT.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.persistent; + +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction; +import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; + +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 1) +public class PersistentActionFullRestartIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(TestPersistentActionPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + protected boolean ignoreExternalCluster() { + return true; + } + + @TestLogging("org.elasticsearch.xpack.persistent:TRACE,org.elasticsearch.cluster.service:DEBUG") + public void testFullClusterRestart() throws Exception { + int numberOfTasks = randomIntBetween(1, 10); + long[] taskIds = new long[numberOfTasks]; + boolean[] stopped = new boolean[numberOfTasks]; + int runningTasks = 0; + for (int i = 0; i < numberOfTasks; i++) { + if (randomBoolean()) { + runningTasks++; + taskIds[i] = TestPersistentAction.INSTANCE.newRequestBuilder(client()).testParam("Blah").get().getTaskId(); + stopped[i] = false; + } else { + taskIds[i] = CreatePersistentTaskAction.INSTANCE.newRequestBuilder(client()) + .setAction(TestPersistentAction.NAME) + .setRequest(new TestRequest("Blah")) + .setStopped(true) + .get().getTaskId(); + stopped[i] = true; + } + } + final int numberOfRunningTasks = runningTasks; + PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); + + if (numberOfRunningTasks > 0) { + // Make sure that at least one of the tasks is running + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get() + .getTasks().size(), greaterThan(0)); + }); + } + + // Restart cluster + internalCluster().fullRestart(); + ensureYellow(); + + tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress.tasks().size(), equalTo(numberOfTasks)); + // Check that cluster state is correct + for (int i = 0; i < numberOfTasks; i++) { + PersistentTaskInProgress task = tasksInProgress.getTask(taskIds[i]); + assertNotNull(task); + assertThat(task.isStopped(), equalTo(stopped[i])); + } + + logger.info("Waiting for {} original tasks to start", numberOfRunningTasks); + assertBusy(() -> { + // Wait for the running task to start automatically + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(), + equalTo(numberOfRunningTasks)); + }); + + // Start all other tasks + tasksInProgress = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksInProgress.TYPE); + for (int i = 0; i < numberOfTasks; i++) { + PersistentTaskInProgress task = tasksInProgress.getTask(taskIds[i]); + assertNotNull(task); + logger.info("checking task with id {} stopped {} node {}", task.getId(), task.isStopped(), task.getExecutorNode()); + assertThat(task.isStopped(), equalTo(stopped[i])); + assertThat(task.getExecutorNode(), stopped[i] ? nullValue() : notNullValue()); + if (stopped[i]) { + assertAcked(StartPersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(task.getId()).get()); + } + } + + logger.info("Waiting for {} tasks to start", numberOfTasks); + assertBusy(() -> { + // Wait for all tasks to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks().size(), + equalTo(numberOfTasks)); + }); + + logger.info("Complete all tasks"); + // Complete the running task and make sure it finishes properly + assertThat(new TestPersistentActionPlugin.TestTasksRequestBuilder(client()).setOperation("finish").get().getTasks().size(), + equalTo(numberOfTasks)); + + assertBusy(() -> { + // Make sure the task is removed from the cluster state + assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE)).tasks(), empty()); + }); + + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java index 845979fb471..72880e3fcbc 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentActionIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction; @@ -37,22 +38,10 @@ public class PersistentActionIT extends ESIntegTestCase { return nodePlugins(); } - @Override - protected Collection> getMockPlugins() { - return super.getMockPlugins(); - } - protected boolean ignoreExternalCluster() { return true; } - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .build(); - } - @After public void cleanup() throws Exception { assertNoRunningTasks(); @@ -112,19 +101,64 @@ public class PersistentActionIT extends ESIntegTestCase { // Verifying parent assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(taskId)); assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster")); + stopOrCancelTask(firstRunningTask.getTaskId()); + } - if (randomBoolean()) { - logger.info("Completing the running task"); - // Complete the running task and make sure it finishes properly - assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(firstRunningTask.getTaskId()) - .get().getTasks().size(), equalTo(1)); - } else { - logger.info("Cancelling the running task"); - // Cancel the running task and make sure it finishes properly - assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(firstRunningTask.getTaskId()) - .get().getTasks().size(), equalTo(1)); + public void testPersistentActionCompletionWithoutRemoval() throws Exception { + boolean stopped = randomBoolean(); + long taskId = CreatePersistentTaskAction.INSTANCE.newRequestBuilder(client()) + .setAction(TestPersistentAction.NAME) + .setRequest(new TestPersistentActionPlugin.TestRequest("Blah")) + .setRemoveOnCompletion(false) + .setStopped(stopped).get().getTaskId(); + PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + assertThat(tasksInProgress.getTask(taskId).isStopped(), equalTo(stopped)); + assertThat(tasksInProgress.getTask(taskId).getExecutorNode(), stopped ? nullValue() : notNullValue()); + assertThat(tasksInProgress.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); + + int numberOfIters = randomIntBetween(1, 5); // we will start/stop the action a few times before removing it + logger.info("start/stop the task {} times stating with stopped {}", numberOfIters, stopped); + for (int i = 0; i < numberOfIters; i++) { + logger.info("iteration {}", i); + if (stopped) { + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks(), + empty()); + assertAcked(StartPersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get()); + } + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get().getTasks() + .size(), equalTo(1)); + }); + TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]") + .get().getTasks().get(0); + + stopOrCancelTask(firstRunningTask.getTaskId()); + + assertBusy(() -> { + // Wait for the task to finish + List tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]").get() + .getTasks(); + logger.info("Found {} tasks", tasks.size()); + assertThat(tasks.size(), equalTo(0)); + }); + stopped = true; } + + assertBusy(() -> { + // Wait for the task to be marked as stopped + PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); + assertThat(tasks.tasks().size(), equalTo(1)); + assertThat(tasks.getTask(taskId).isStopped(), equalTo(true)); + assertThat(tasks.getTask(taskId).shouldRemoveOnCompletion(), equalTo(false)); + }); + + logger.info("Removing action record from cluster state"); + assertAcked(RemovePersistentTaskAction.INSTANCE.newRequestBuilder(client()).setTaskId(taskId).get()); } public void testPersistentActionWithNoAvailableNode() throws Exception { @@ -168,7 +202,8 @@ public class PersistentActionIT extends ESIntegTestCase { TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentAction.NAME + "[c]") .get().getTasks().get(0); - PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasksInProgress = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(1)); assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue()); @@ -181,7 +216,8 @@ public class PersistentActionIT extends ESIntegTestCase { int finalI = i; assertBusy(() -> { - PersistentTasksInProgress tasks = internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE); + PersistentTasksInProgress tasks = internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE); assertThat(tasks.tasks().size(), equalTo(1)); assertThat(tasks.tasks().iterator().next().getStatus(), notNullValue()); assertThat(tasks.tasks().iterator().next().getStatus().toString(), equalTo("{\"phase\":\"phase " + (finalI + 1) + "\"}")); @@ -195,6 +231,24 @@ public class PersistentActionIT extends ESIntegTestCase { .get().getTasks().size(), equalTo(1)); } + + private void stopOrCancelTask(TaskId taskId) { + if (randomBoolean()) { + logger.info("Completing the running task"); + // Complete the running task and make sure it finishes properly + assertThat(new TestTasksRequestBuilder(client()).setOperation("finish").setTaskId(taskId) + .get().getTasks().size(), equalTo(1)); + + } else { + logger.info("Cancelling the running task"); + // Cancel the running task and make sure it finishes properly + assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(taskId) + .get().getTasks().size(), equalTo(1)); + } + + + } + private void assertNoRunningTasks() throws Exception { assertBusy(() -> { // Wait for the task to finish @@ -204,8 +258,8 @@ public class PersistentActionIT extends ESIntegTestCase { assertThat(tasks.size(), equalTo(0)); // Make sure the task is removed from the cluster state - assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().custom(PersistentTasksInProgress.TYPE)) - .tasks(), empty()); + assertThat(((PersistentTasksInProgress) internalCluster().clusterService().state().getMetaData() + .custom(PersistentTasksInProgress.TYPE)).tasks(), empty()); }); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java new file mode 100644 index 00000000000..0efd42ddfc4 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTaskClusterServiceTests.java @@ -0,0 +1,391 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.persistent; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; +import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class PersistentTaskClusterServiceTests extends ESTestCase { + + public void testReassignmentRequired() { + int numberOfIterations = randomIntBetween(10, 100); + ClusterState clusterState = initialState(); + for (int i = 0; i < numberOfIterations; i++) { + boolean significant = randomBoolean(); + ClusterState previousState = clusterState; + logger.info("inter {} significant: {}", i, significant); + if (significant) { + clusterState = significantChange(clusterState); + } else { + clusterState = insignificantChange(clusterState); + } + ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, previousState); + assertThat(dumpEvent(event), significant, equalTo(PersistentTaskClusterService.reassignmentRequired(event, + new PersistentTaskClusterService.ExecutorNodeDecider() { + @Override + public String executorNode( + String action, ClusterState currentState, Request request) { + return randomNode(currentState.nodes()); + } + }))); + } + } + + public void testReassignTasksWithNoTasks() { + ClusterState clusterState = initialState(); + assertThat(reassign(clusterState).metaData().custom(PersistentTasksInProgress.TYPE), nullValue()); + } + + public void testReassignConsidersClusterStateUpdates() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder( + clusterState.metaData().custom(PersistentTasksInProgress.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 10)); + int numberOfTasks = randomIntBetween(2, 40); + for (int i = 0; i < numberOfTasks; i++) { + addTask(tasks, "should_assign", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + } + + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + ClusterState newClusterState = reassign(clusterState); + + PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress, notNullValue()); + + } + + public void testReassignTasks() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder( + clusterState.metaData().custom(PersistentTasksInProgress.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 10)); + int numberOfTasks = randomIntBetween(0, 40); + for (int i = 0; i < numberOfTasks; i++) { + switch (randomInt(3)) { + case 0: + // add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned + addTask(tasks, "should_assign", "assign_me", randomBoolean() ? null : "no_longer_exits", false); + break; + case 1: + // add a task assigned to non-existing node that should not get assigned + addTask(tasks, "should_not_assign", "dont_assign_me", randomBoolean() ? null : "no_longer_exits", false); + break; + case 2: + // add a stopped task assigned to non-existing node that should not get assigned + addTask(tasks, "should_not_assign", "fail_me_if_called", null, true); + break; + case 3: + addTask(tasks, "assign_one", "assign_one", randomBoolean() ? null : "no_longer_exits", false); + break; + + } + } + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + ClusterState newClusterState = reassign(clusterState); + + PersistentTasksInProgress tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + assertThat(tasksInProgress, notNullValue()); + + assertThat("number of tasks shouldn't change as a result or reassignment", + numberOfTasks, equalTo(tasksInProgress.tasks().size())); + + int assignOneCount = 0; + + for (PersistentTaskInProgress task : tasksInProgress.tasks()) { + if (task.isStopped()) { + assertThat("stopped tasks should be never assigned", task.getExecutorNode(), nullValue()); + } else { + switch (task.getAction()) { + case "should_assign": + assertThat(task.getExecutorNode(), notNullValue()); + if (clusterState.nodes().nodeExists(task.getExecutorNode()) == false) { + logger.info(clusterState.metaData().custom(PersistentTasksInProgress.TYPE).toString()); + } + assertThat("task should be assigned to a node that is in the cluster, was assigned to " + task.getExecutorNode(), + clusterState.nodes().nodeExists(task.getExecutorNode()), equalTo(true)); + break; + case "should_not_assign": + assertThat(task.getExecutorNode(), nullValue()); + break; + case "assign_one": + if (task.getExecutorNode() != null) { + assignOneCount++; + assertThat("more than one assign_one tasks are assigned", assignOneCount, lessThanOrEqualTo(1)); + } + break; + default: + fail("Unknown action " + task.getAction()); + } + } + } + } + + + private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) { + for (int i = 0; i < nonLocalNodesCount; i++) { + nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); + } + } + + private ClusterState reassign(ClusterState clusterState) { + return PersistentTaskClusterService.reassignTasks(clusterState, logger, + new PersistentTaskClusterService.ExecutorNodeDecider() { + @Override + public String executorNode( + String action, ClusterState currentState, Request request) { + TestRequest testRequest = (TestRequest) request; + switch (testRequest.getTestParam()) { + case "assign_me": + return randomNode(currentState.nodes()); + case "dont_assign_me": + return null; + case "fail_me_if_called": + fail("the decision decider shouldn't be called on this task"); + return null; + case "assign_one": + return assignOnlyOneTaskAtATime(currentState); + default: + fail("unknown param " + testRequest.getTestParam()); + } + return null; + } + }); + + } + + private String assignOnlyOneTaskAtATime(ClusterState clusterState) { + DiscoveryNodes nodes = clusterState.nodes(); + PersistentTasksInProgress tasksInProgress = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + if (tasksInProgress.findTasks("assign_one", + task -> task.isStopped() == false && nodes.nodeExists(task.getExecutorNode())).isEmpty()) { + return randomNode(clusterState.nodes()); + } else { + return null; + } + } + + private String randomNode(DiscoveryNodes nodes) { + if (nodes.getNodes().isEmpty()) { + return null; + } + List nodeList = new ArrayList<>(); + for (ObjectCursor node : nodes.getNodes().keys()) { + nodeList.add(node.value); + } + return randomFrom(nodeList); + + } + + private String dumpEvent(ClusterChangedEvent event) { + return "nodes_changed: " + event.nodesChanged() + + " nodes_removed:" + event.nodesRemoved() + + " routing_table_changed:" + event.routingTableChanged() + + " tasks: " + event.state().metaData().custom(PersistentTasksInProgress.TYPE); + } + + private ClusterState significantChange(ClusterState clusterState) { + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + if (tasks != null) { + if (randomBoolean()) { + // + boolean removedNode = false; + for (PersistentTaskInProgress task : tasks.tasks()) { + if (task.getExecutorNode() != null && clusterState.nodes().nodeExists(task.getExecutorNode())) { + logger.info("removed node {}", task.getExecutorNode()); + builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(task.getExecutorNode())); + return builder.build(); + } + } + } + } + boolean tasksOrNodesChanged = false; + // add a new unassigned task + if (hasUnassigned(tasks, clusterState.nodes()) == false) { + // we don't have any unassigned tasks - add some + logger.info("added random task"); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, false); + tasksOrNodesChanged = true; + } + // add a node if there are unassigned tasks + if (clusterState.nodes().getNodes().isEmpty()) { + logger.info("added random node"); + builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10)))); + tasksOrNodesChanged = true; + } + + if (tasksOrNodesChanged == false) { + // change routing table to simulate a change + logger.info("changed routing table"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); + RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); + changeRoutingTable(metaData, routingTable); + builder.metaData(metaData).routingTable(routingTable.build()); + } + return builder.build(); + } + + private ClusterState insignificantChange(ClusterState clusterState) { + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksInProgress tasks = clusterState.getMetaData().custom(PersistentTasksInProgress.TYPE); + if (randomBoolean()) { + if (hasUnassigned(tasks, clusterState.nodes()) == false) { + // we don't have any unassigned tasks - adding a node or changing a routing table shouldn't affect anything + if (randomBoolean()) { + logger.info("added random node"); + builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode(randomAsciiOfLength(10)))); + } + if (randomBoolean()) { + // add unassigned task in stopped state + logger.info("added random stopped task"); + addRandomTask(builder, MetaData.builder(clusterState.metaData()), PersistentTasksInProgress.builder(tasks), null, true); + return builder.build(); + } else { + logger.info("changed routing table"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()); + RoutingTable.Builder routingTable = RoutingTable.builder(clusterState.routingTable()); + changeRoutingTable(metaData, routingTable); + builder.metaData(metaData).routingTable(routingTable.build()); + } + return builder.build(); + } + } + if (randomBoolean()) { + // remove a node that doesn't have any tasks assigned to it and it's not the master node + for (DiscoveryNode node : clusterState.nodes()) { + if (hasTasksAssignedTo(tasks, node.getId()) == false && "this_node".equals(node.getId()) == false) { + logger.info("removed unassigned node {}", node.getId()); + return builder.nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(node.getId())).build(); + } + } + } + + if (randomBoolean()) { + // clear the task + if (randomBoolean()) { + logger.info("removed all tasks"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksInProgress.TYPE, + PersistentTasksInProgress.builder().build()); + return builder.metaData(metaData).build(); + } else { + logger.info("set task custom to null"); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).removeCustom(PersistentTasksInProgress.TYPE); + return builder.metaData(metaData).build(); + } + } + logger.info("removed all unassigned tasks and changed routing table"); + PersistentTasksInProgress.Builder tasksBuilder = PersistentTasksInProgress.builder(tasks); + if (tasks != null) { + for (PersistentTaskInProgress task : tasks.tasks()) { + if (task.getExecutorNode() == null) { + tasksBuilder.removeTask(task.getId()); + } + } + } + // Just add a random index - that shouldn't change anything + IndexMetaData indexMetaData = IndexMetaData.builder(randomAsciiOfLength(10)) + .settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random()))) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).put(indexMetaData, false) + .putCustom(PersistentTasksInProgress.TYPE, tasksBuilder.build()); + return builder.metaData(metaData).build(); + } + + private boolean hasUnassigned(PersistentTasksInProgress tasks, DiscoveryNodes discoveryNodes) { + if (tasks == null || tasks.tasks().isEmpty()) { + return false; + } + return tasks.tasks().stream().anyMatch(task -> + task.isStopped() == false && + (task.getExecutorNode() == null || discoveryNodes.nodeExists(task.getExecutorNode()))); + } + + private boolean hasTasksAssignedTo(PersistentTasksInProgress tasks, String nodeId) { + return tasks != null && tasks.tasks().stream().anyMatch( + task -> nodeId.equals(task.getExecutorNode())) == false; + } + + private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuilder, + MetaData.Builder metaData, PersistentTasksInProgress.Builder tasks, + String node, + boolean stopped) { + return clusterStateBuilder.metaData(metaData.putCustom(PersistentTasksInProgress.TYPE, + tasks.addTask(randomAsciiOfLength(10), new TestRequest(randomAsciiOfLength(10)), stopped, randomBoolean(), node).build())); + } + + private void addTask(PersistentTasksInProgress.Builder tasks, String action, String param, String node, boolean stopped) { + tasks.addTask(action, new TestRequest(param), stopped, randomBoolean(), node); + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA))), + Version.CURRENT); + } + + + private ClusterState initialState() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + int randomIndices = randomIntBetween(0, 5); + for (int i = 0; i < randomIndices; i++) { + changeRoutingTable(metaData, routingTable); + } + + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + nodes.add(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "this_node")); + nodes.localNodeId("this_node"); + nodes.masterNodeId("this_node"); + + return ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable.build()) + .build(); + } + + private void changeRoutingTable(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData indexMetaData = IndexMetaData.builder(randomAsciiOfLength(10)) + .settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random()))) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + metaData.put(indexMetaData, false); + routingTable.addAsNew(indexMetaData); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java index 88ea66538ee..9912b38f10f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksInProgressTests.java @@ -5,48 +5,263 @@ */ package org.elasticsearch.xpack.persistent; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.MetaData.Custom; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.tasks.Task; -import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.AbstractDiffableSerializationTestCase; +import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.Builder; import org.elasticsearch.xpack.persistent.PersistentTasksInProgress.PersistentTaskInProgress; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.Status; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction; +import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.Collections; -public class PersistentTasksInProgressTests extends AbstractWireSerializingTestCase { +import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_GATEWAY; +import static org.elasticsearch.cluster.metadata.MetaData.CONTEXT_MODE_SNAPSHOT; + +public class PersistentTasksInProgressTests extends AbstractDiffableSerializationTestCase { @Override protected PersistentTasksInProgress createTestInstance() { int numberOfTasks = randomInt(10); - Map> entries = new HashMap<>(); + PersistentTasksInProgress.Builder tasks = PersistentTasksInProgress.builder(); for (int i = 0; i < numberOfTasks; i++) { - PersistentTaskInProgress taskInProgress = new PersistentTaskInProgress<>( - randomLong(), randomAsciiOfLength(10), new TestPersistentActionPlugin.TestRequest(randomAsciiOfLength(10)), - randomAsciiOfLength(10)); + tasks.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), + randomBoolean(), randomBoolean(), randomAsciiOfLength(10)); if (randomBoolean()) { // From time to time update status - taskInProgress = new PersistentTaskInProgress<>(taskInProgress, new Status(randomAsciiOfLength(10))); + tasks.updateTaskStatus(tasks.getCurrentId(), new Status(randomAsciiOfLength(10))); } - entries.put(taskInProgress.getId(), taskInProgress); } - return new PersistentTasksInProgress(randomLong(), entries); + return tasks.build(); } @Override - protected Writeable.Reader instanceReader() { + protected Writeable.Reader instanceReader() { return PersistentTasksInProgress::new; } @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Arrays.asList( - new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestPersistentActionPlugin.TestRequest::new), + new Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), + new Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), + new Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new), new Entry(Task.Status.class, Status.NAME, Status::new) )); } + + @Override + protected Custom makeTestChanges(Custom testInstance) { + PersistentTasksInProgress tasksInProgress = (PersistentTasksInProgress) testInstance; + Builder builder = new Builder(); + switch (randomInt(3)) { + case 0: + addRandomTask(builder); + break; + case 1: + if (tasksInProgress.tasks().isEmpty()) { + addRandomTask(builder); + } else { + builder.reassignTask(pickRandomTask(tasksInProgress), randomAsciiOfLength(10)); + } + break; + case 2: + if (tasksInProgress.tasks().isEmpty()) { + addRandomTask(builder); + } else { + builder.updateTaskStatus(pickRandomTask(tasksInProgress), randomBoolean() ? new Status(randomAsciiOfLength(10)) : null); + } + break; + case 3: + if (tasksInProgress.tasks().isEmpty()) { + addRandomTask(builder); + } else { + builder.removeTask(pickRandomTask(tasksInProgress)); + } + break; + } + return builder.build(); + } + + @Override + protected Writeable.Reader> diffReader() { + return PersistentTasksInProgress::readDiffFrom; + } + + @Override + protected PersistentTasksInProgress doParseInstance(XContentParser parser) throws IOException { + return PersistentTasksInProgress.fromXContent(parser); + } + + @Override + protected XContentBuilder toXContent(Custom instance, XContentType contentType) throws IOException { + return toXContent(instance, contentType, new ToXContent.MapParams( + Collections.singletonMap(MetaData.CONTEXT_MODE_PARAM, MetaData.XContentContext.API.toString()))); + } + + protected XContentBuilder toXContent(Custom instance, XContentType contentType, ToXContent.MapParams params) throws IOException { + // We need all attribute to be serialized/de-serialized for testing + XContentBuilder builder = XContentFactory.contentBuilder(contentType); + if (randomBoolean()) { + builder.prettyPrint(); + } + if (instance.isFragment()) { + builder.startObject(); + } + instance.toXContent(builder, params); + if (instance.isFragment()) { + builder.endObject(); + } + return builder; + } + + private Builder addRandomTask(Builder builder) { + builder.addTask(TestPersistentAction.NAME, new TestRequest(randomAsciiOfLength(10)), + randomBoolean(), randomBoolean(), randomAsciiOfLength(10)); + return builder; + } + + private long pickRandomTask(PersistentTasksInProgress testInstance) { + return randomFrom(new ArrayList<>(testInstance.tasks())).getId(); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return new NamedXContentRegistry(Arrays.asList( + new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME), + TestRequest::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentAction.NAME), + Status::fromXContent) + )); + } + + @SuppressWarnings("unchecked") + public void testSerializationContext() throws Exception { + PersistentTasksInProgress testInstance = createTestInstance(); + for (int i = 0; i < randomInt(10); i++) { + testInstance = (PersistentTasksInProgress) makeTestChanges(testInstance); + } + + ToXContent.MapParams params = new ToXContent.MapParams( + Collections.singletonMap(MetaData.CONTEXT_MODE_PARAM, randomFrom(CONTEXT_MODE_SNAPSHOT, CONTEXT_MODE_GATEWAY))); + + XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder builder = toXContent(testInstance, xContentType, params); + XContentBuilder shuffled = shuffleXContent(builder); + + XContentParser parser = createParser(XContentFactory.xContent(xContentType), shuffled.bytes()); + PersistentTasksInProgress newInstance = doParseInstance(parser); + assertNotSame(newInstance, testInstance); + + assertEquals(testInstance.tasks().size(), newInstance.tasks().size()); + for (PersistentTaskInProgress testTask : testInstance.tasks()) { + PersistentTaskInProgress newTask = (PersistentTaskInProgress) newInstance.getTask(testTask.getId()); + assertNotNull(newTask); + + // Things that should be serialized + assertEquals(testTask.getAction(), newTask.getAction()); + assertEquals(testTask.getId(), newTask.getId()); + assertEquals(testTask.getStatus(), newTask.getStatus()); + assertEquals(testTask.getRequest(), newTask.getRequest()); + assertEquals(testTask.isStopped(), newTask.isStopped()); + + // Things that shouldn't be serialized + assertEquals(0, newTask.getAllocationId()); + assertNull(newTask.getExecutorNode()); + } + } + + public void testBuilder() { + PersistentTasksInProgress persistentTasksInProgress = null; + long lastKnownTask = -1; + for (int i = 0; i < randomIntBetween(10, 100); i++) { + final Builder builder; + if (randomBoolean()) { + builder = new Builder(); + } else { + builder = new Builder(persistentTasksInProgress); + } + boolean changed = false; + for (int j = 0; j < randomIntBetween(1, 10); j++) { + switch (randomInt(5)) { + case 0: + lastKnownTask = addRandomTask(builder).getCurrentId(); + changed = true; + break; + case 1: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + if (randomBoolean()) { + builder.reassignTask(lastKnownTask, randomAsciiOfLength(10)); + } else { + builder.reassignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + } + break; + case 2: + if (builder.hasTask(lastKnownTask)) { + PersistentTaskInProgress task = builder.build().getTask(lastKnownTask); + if (randomBoolean()) { + // Trying to reassign to the same node + builder.assignTask(lastKnownTask, (s, request) -> task.getExecutorNode()); + // should change if the task was stopped AND unassigned + if (task.getExecutorNode() == null && task.isStopped()) { + changed = true; + } + } else { + // Trying to reassign to a different node + builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + // should change if the task was unassigned + if (task.getExecutorNode() == null) { + changed = true; + } + } + } else { + // task doesn't exist - shouldn't change + builder.assignTask(lastKnownTask, (s, request) -> randomAsciiOfLength(10)); + } + break; + case 3: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAsciiOfLength(10)) : null); + break; + case 4: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + builder.removeTask(lastKnownTask); + break; + case 5: + if (builder.hasTask(lastKnownTask)) { + changed = true; + } + builder.finishTask(lastKnownTask); + break; + } + } + assertEquals(changed, builder.isChanged()); + persistentTasksInProgress = builder.build(); + } + + } + } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java index a4395ebd6d2..d3a6f54d73e 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/StartPersistentActionRequestTests.java @@ -7,7 +7,7 @@ package org.elasticsearch.xpack.persistent; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; -import org.elasticsearch.xpack.persistent.StartPersistentTaskAction.Request; +import org.elasticsearch.xpack.persistent.CreatePersistentTaskAction.Request; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestPersistentAction; import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.TestRequest; import org.elasticsearch.test.AbstractStreamableTestCase; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java index a4f693d6ad3..f2c86f5c0e0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/TestPersistentActionPlugin.java @@ -23,8 +23,10 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; @@ -33,8 +35,10 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; @@ -58,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Objects.requireNonNull; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.junit.Assert.assertTrue; @@ -73,6 +78,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { return Arrays.asList( new ActionHandler<>(TestPersistentAction.INSTANCE, TransportTestPersistentAction.class), new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class), + new ActionHandler<>(CreatePersistentTaskAction.INSTANCE, CreatePersistentTaskAction.TransportAction.class), new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class), new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class), new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class), @@ -100,14 +106,32 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { new NamedWriteableRegistry.Entry(PersistentActionRequest.class, TestPersistentAction.NAME, TestRequest::new), new NamedWriteableRegistry.Entry(Task.Status.class, PersistentActionCoordinator.Status.NAME, PersistentActionCoordinator.Status::new), - new NamedWriteableRegistry.Entry(ClusterState.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), + new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::new), new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom), new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) ); } + @Override + public List getNamedXContent() { + return Arrays.asList( + new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(PersistentTasksInProgress.TYPE), + PersistentTasksInProgress::fromXContent), + new NamedXContentRegistry.Entry(PersistentActionRequest.class, new ParseField(TestPersistentAction.NAME), + TestRequest::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(Status.NAME), Status::fromXContent) + ); + } + public static class TestRequest extends PersistentActionRequest { + public static final ConstructingObjectParser REQUEST_PARSER = + new ConstructingObjectParser<>(TestPersistentAction.NAME, args -> new TestRequest((String) args[0])); + + static { + REQUEST_PARSER.declareString(constructorArg(), new ParseField("param")); + } + private String executorNodeAttr = null; private String responseNode = null; @@ -171,10 +195,15 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field("param", testParam); builder.endObject(); return builder; } + public static TestRequest fromXContent(XContentParser parser) throws IOException { + return REQUEST_PARSER.parse(parser, null); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -241,6 +270,13 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { private final String phase; + public static final ConstructingObjectParser STATUS_PARSER = + new ConstructingObjectParser<>(TestPersistentAction.NAME, args -> new Status((String) args[0])); + + static { + STATUS_PARSER.declareString(constructorArg(), new ParseField("phase")); + } + public Status(String phase) { this.phase = requireNonNull(phase, "Phase cannot be null"); } @@ -262,6 +298,11 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { return builder; } + public static Task.Status fromXContent(XContentParser parser) throws IOException { + return STATUS_PARSER.parse(parser, null); + } + + @Override public boolean isFragment() { return false; @@ -305,7 +346,7 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, TestPersistentAction.NAME, false, threadPool, transportService, persistentActionService, persistentActionRegistry, actionFilters, indexNameExpressionResolver, TestRequest::new, - ThreadPool.Names.MANAGEMENT); + ThreadPool.Names.GENERIC); this.transportService = transportService; } @@ -328,8 +369,9 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { while (true) { // wait for something to happen assertTrue(awaitBusy(() -> testTask.isCancelled() || - testTask.getOperation() != null || - transportService.lifecycleState() != Lifecycle.State.STARTED)); // speedup finishing on closed nodes + testTask.getOperation() != null || + transportService.lifecycleState() != Lifecycle.State.STARTED, // speedup finishing on closed nodes + 30, TimeUnit.SECONDS)); // This can take a while during large cluster restart if (transportService.lifecycleState() != Lifecycle.State.STARTED) { return; } @@ -416,6 +458,11 @@ public class TestPersistentActionPlugin extends Plugin implements ActionPlugin { public void setOperation(String operation) { this.operation = operation; } + + @Override + public String toString() { + return "TestTask[" + this.getId() + ", " + this.getParentTaskId() + ", " + this.getOperation() + "]"; + } } static class TestTaskResponse implements Writeable { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java new file mode 100644 index 00000000000..33b8be1f659 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/UpdatePersistentTaskRequestTests.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.persistent; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.persistent.TestPersistentActionPlugin.Status; +import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction.Request; + +import java.util.Collections; + +public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase { + + @Override + protected Request createTestInstance() { + return new Request(randomLong(), new Status(randomAsciiOfLength(10))); + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(Collections.singletonList( + new NamedWriteableRegistry.Entry(Task.Status.class, Status.NAME, Status::new) + )); + } +} \ No newline at end of file diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index 0f76d78213f..9c2c41bbfc0 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -136,6 +136,7 @@ cluster:admin/ml/anomaly_detectors/open cluster:admin/ml/job/update indices:data/write/delete/mlbyquery cluster:admin/ml/job/update/process +cluster:admin/persistent/create cluster:admin/persistent/start cluster:admin/persistent/completion cluster:admin/persistent/update_status