From d114a55b993f5716e52ac6724ff36677f10525ac Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 14 Apr 2017 10:43:21 +0200 Subject: [PATCH] [ML] Make open job and start datafeed apis master node actions and let close job and stop datafeed apis redirect to elected master node. This is for cluster state observation purposes, so that a subsequent open and then close job or start and then stop datafeed see the same local cluster state and sanity validation doesn't fail. Original commit: elastic/x-pack-elasticsearch@21a63184b93bcf0b14142c033b4c7c10caa16857 --- .../xpack/ml/action/CloseJobAction.java | 90 +++++++------ .../xpack/ml/action/FlushJobAction.java | 6 +- .../xpack/ml/action/OpenJobAction.java | 43 +++++- .../xpack/ml/action/PostDataAction.java | 5 +- .../xpack/ml/action/StartDatafeedAction.java | 48 +++++-- .../xpack/ml/action/StopDatafeedAction.java | 124 ++++++++++-------- .../ml/action/TransportJobTaskAction.java | 24 +--- .../xpack/ml/action/UpdateProcessAction.java | 5 +- .../autodetect/AutodetectProcessManager.java | 11 +- .../ml/integration/MlBasicMultiNodeIT.java | 33 +---- 10 files changed, 212 insertions(+), 177 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 9126ded7759..e80f176d321 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; @@ -20,6 +21,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; @@ -35,6 +37,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -291,47 +294,58 @@ public class CloseJobAction extends Action listener) { - /* - * Closing of multiple jobs: - * - * 1. Resolve and validate jobs first: if any job does not meet the - * criteria (e.g. open datafeed), fail immediately, do not close any - * job - * - * 2. Internally a task request is created for every job, so there - * are n inner tasks for 1 user request - * - * 3. Collect n inner task results or failures and send 1 outer - * result/failure - */ - - ClusterState state = clusterService.state(); - request.resolvedJobIds = resolveAndValidateJobId(request.getJobId(), state).toArray(new String[0]); - if (request.resolvedJobIds.length == 0) { - listener.onResponse(new Response(true)); - return; - } - - Set executorNodes = new HashSet<>(); - for (String resolvedJobId : request.resolvedJobIds) { - JobManager.getJobOrThrowIfUnknown(state, resolvedJobId); - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); - if (jobTask == null || jobTask.isAssigned() == false) { - String message = "Cannot perform requested action because job [" + resolvedJobId - + "] is not open"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + if (nodes.isLocalNodeElectedMaster() == false) { + // Delegates close job to elected master node, so it becomes the coordinating node. + // See comment in OpenJobAction.Transport class for more information. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master node")); } else { - executorNodes.add(jobTask.getExecutorNode()); + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, Response::new)); } - } - - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); - if (request.isForce()) { - forceCloseJob(state, request, listener); } else { - normalCloseJob(state, task, request, listener); + /* + * Closing of multiple jobs: + * + * 1. Resolve and validate jobs first: if any job does not meet the + * criteria (e.g. open datafeed), fail immediately, do not close any + * job + * + * 2. Internally a task request is created for every job, so there + * are n inner tasks for 1 user request + * + * 3. Collect n inner task results or failures and send 1 outer + * result/failure + */ + request.resolvedJobIds = resolveAndValidateJobId(request.getJobId(), state).toArray(new String[0]); + if (request.resolvedJobIds.length == 0) { + listener.onResponse(new Response(true)); + return; + } + + Set executorNodes = new HashSet<>(); + for (String resolvedJobId : request.resolvedJobIds) { + JobManager.getJobOrThrowIfUnknown(state, resolvedJobId); + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(resolvedJobId, tasks); + if (jobTask == null || jobTask.isAssigned() == false) { + String message = "Cannot perform requested action because job [" + resolvedJobId + + "] is not open"; + listener.onFailure(ExceptionsHelper.conflictStatusException(message)); + return; + } else { + executorNodes.add(jobTask.getExecutorNode()); + } + } + + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); + if (request.isForce()) { + forceCloseJob(state, request, listener); + } else { + normalCloseJob(state, task, request, listener); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java index 3e8cf8efd43..1a17402275e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/FlushJobAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -236,7 +235,7 @@ public class FlushJobAction extends Action { + public static class TransportAction extends TransportJobTaskAction { @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, @@ -255,8 +254,7 @@ public class FlushJobAction extends Action listener, ClusterState state) { + protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { InterimResultsParams.Builder paramsBuilder = InterimResultsParams.builder(); paramsBuilder.calcInterim(request.getCalcInterim()); if (request.getAdvanceTime() != null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 1243af7213d..2747c2b190f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -11,14 +11,16 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -88,7 +90,7 @@ public class OpenJobAction extends Action implements ToXContent { public static Request fromXContent(XContentParser parser) { return parseRequest(null, parser); @@ -356,22 +358,41 @@ public class OpenJobAction extends Action { + // This class extends from TransportMasterNodeAction for cluster state observing purposes. + // The close job api also redirect the elected master node. + // The master node will wait for the job to be opened by checking the persistent task's status and then return. + // To ensure that a subsequent close job call will see that same task status (and sanity validation doesn't fail) + // both open and close job apis redirect to the elected master node. + // In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off. + // The open job api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue. + public static class TransportAction extends TransportMasterNodeAction { private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, - PersistentTasksService persistentTasksService, ActionFilters actionFilters, + ClusterService clusterService, PersistentTasksService persistentTasksService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); this.licenseState = licenseState; this.persistentTasksService = persistentTasksService; } @Override - protected void doExecute(Request request, ActionListener listener) { + protected String executor() { + // This api doesn't do heavy or blocking operations (just delegates PersistentTasksService), + // so we can do this on the network thread + return ThreadPool.Names.SAME; + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { JobParams jobParams = request.getJobParams(); if (licenseState.isMachineLearningAllowed()) { ActionListener> finalListener = new ActionListener>() { @@ -395,6 +416,14 @@ public class OpenJobAction extends Action listener) { JobPredicate predicate = new JobPredicate(); persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.timeout, diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java index 6d999eee4de..0e4334b3b50 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PostDataAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -233,7 +232,7 @@ public class PostDataAction extends Action { + public static class TransportAction extends TransportJobTaskAction { @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, @@ -252,7 +251,7 @@ public class PostDataAction extends Action listener, ClusterState state) { + protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build(); DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription())); try { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java index fa571833e43..8380140b1f5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StartDatafeedAction.java @@ -12,19 +12,22 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; @@ -95,7 +98,7 @@ public class StartDatafeedAction return new Response(); } - public static class Request extends ActionRequest implements ToXContent { + public static class Request extends MasterNodeRequest implements ToXContent { public static Request fromXContent(XContentParser parser) { return parseRequest(null, parser); @@ -403,22 +406,41 @@ public class StartDatafeedAction } } - public static class TransportAction extends HandledTransportAction { + // This class extends from TransportMasterNodeAction for cluster state observing purposes. + // The stop datafeed api also redirect the elected master node. + // The master node will wait for the datafeed to be started by checking the persistent task's status and then return. + // To ensure that a subsequent stop datafeed call will see that same task status (and sanity validation doesn't fail) + // both start and stop datafeed apis redirect to the elected master node. + // In case of instability persistent tasks checks may fail and that is ok, in that case all bets are off. + // The start datafeed api is a low through put api, so the fact that we redirect to elected master node shouldn't be an issue. + public static class TransportAction extends TransportMasterNodeAction { private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; @Inject - public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, XPackLicenseState licenseState, - PersistentTasksService persistentTasksService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, + XPackLicenseState licenseState, PersistentTasksService persistentTasksService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new); this.licenseState = licenseState; this.persistentTasksService = persistentTasksService; } @Override - protected void doExecute(Request request, ActionListener listener) { + protected String executor() { + // This api doesn't do heavy or blocking operations (just delegates PersistentTasksService), + // so we can do this on the network thread + return ThreadPool.Names.SAME; + } + + @Override + protected Response newResponse() { + return new Response(); + } + + @Override + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { DatafeedParams params = request.params; if (licenseState.isMachineLearningAllowed()) { ActionListener> finalListener = new ActionListener>() { @@ -442,6 +464,14 @@ public class StartDatafeedAction } } + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + // We only delegate here to PersistentTasksService, but if there is a metadata writeblock, + // then delagating to PersistentTasksService doesn't make a whole lot of sense, + // because PersistentTasksService will then fail. + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + void waitForDatafeedStarted(String taskId, DatafeedParams params, ActionListener listener) { Predicate> predicate = persistentTask -> { if (persistentTask == null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index 589c6bc8fae..4271a38f469 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.FailedNodeException; @@ -20,6 +21,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; @@ -33,6 +35,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -264,66 +267,79 @@ public class StopDatafeedAction @Override protected void doExecute(Task task, Request request, ActionListener listener) { - ClusterState state = clusterService.state(); - MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE); - PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - - List resolvedDatafeeds = resolve(request.getDatafeedId(), mlMetadata, tasks); - if (resolvedDatafeeds.isEmpty()) { - listener.onResponse(new Response(true)); - return; - } - request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()])); - - if (request.force) { - final AtomicInteger counter = new AtomicInteger(); - final AtomicArray failures = new AtomicArray<>(resolvedDatafeeds.size()); - - for (String datafeedId : resolvedDatafeeds) { - PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); - if (datafeedTask != null) { - persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener>() { - @Override - public void onResponse(PersistentTask persistentTask) { - if (counter.incrementAndGet() == resolvedDatafeeds.size()) { - sendResponseOrFailure(request.getDatafeedId(), listener, failures); - } - } - @Override - public void onFailure(Exception e) { - final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); - if (slot == resolvedDatafeeds.size()) { - sendResponseOrFailure(request.getDatafeedId(), listener, failures); - } - } - }); - } else { - String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + - "datafeed's task could not be found."; - logger.warn(msg); - final int slot = counter.incrementAndGet(); - failures.set(slot - 1, new RuntimeException(msg)); - if (slot == resolvedDatafeeds.size()) { - sendResponseOrFailure(request.getDatafeedId(), listener, failures); - } - } + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + if (nodes.isLocalNodeElectedMaster() == false) { + // Delegates stop datafeed to elected master node, so it becomes the coordinating node. + // See comment in StartDatafeedAction.Transport class for more information. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master node")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, Response::new)); } } else { - Set executorNodes = new HashSet<>(); - Map datafeedIdToPersistentTaskId = new HashMap<>(); + MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE); + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - for (String datafeedId : resolvedDatafeeds) { - PersistentTask datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks); - executorNodes.add(datafeedTask.getExecutorNode()); - datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId()); + List resolvedDatafeeds = resolve(request.getDatafeedId(), mlMetadata, tasks); + if (resolvedDatafeeds.isEmpty()) { + listener.onResponse(new Response(true)); + return; } + request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()])); - ActionListener finalListener = - ActionListener.wrap(r -> waitForDatafeedStopped(datafeedIdToPersistentTaskId, request, r, listener), listener::onFailure); + if (request.force) { + final AtomicInteger counter = new AtomicInteger(); + final AtomicArray failures = new AtomicArray<>(resolvedDatafeeds.size()); - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); - super.doExecute(task, request, finalListener); + for (String datafeedId : resolvedDatafeeds) { + PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); + if (datafeedTask != null) { + persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener>() { + @Override + public void onResponse(PersistentTask persistentTask) { + if (counter.incrementAndGet() == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + } + + @Override + public void onFailure(Exception e) { + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, e); + if (slot == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + } + }); + } else { + String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + + "datafeed's task could not be found."; + logger.warn(msg); + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, new RuntimeException(msg)); + if (slot == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + } + } + } else { + Set executorNodes = new HashSet<>(); + Map datafeedIdToPersistentTaskId = new HashMap<>(); + + for (String datafeedId : resolvedDatafeeds) { + PersistentTask datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks); + executorNodes.add(datafeedTask.getExecutorNode()); + datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId()); + } + + ActionListener finalListener = + ActionListener.wrap(r -> waitForDatafeedStopped(datafeedIdToPersistentTaskId, request, r, listener), listener::onFailure); + + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); + super.doExecute(task, request, finalListener); + } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index ef23b998594..bd04430d61e 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; @@ -39,8 +38,8 @@ import java.util.function.Supplier; */ // TODO: Hacking around here with TransportTasksAction. Ideally we should have another base class in core that // redirects to a single node only -public abstract class TransportJobTaskAction, - Response extends BaseTasksResponse & Writeable> extends TransportTasksAction { +public abstract class TransportJobTaskAction, + Response extends BaseTasksResponse & Writeable> extends TransportTasksAction { protected final AutodetectProcessManager processManager; @@ -71,25 +70,6 @@ public abstract class TransportJobTaskAction listener) { - ClusterState state = clusterService.state(); - PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - JobState jobState = MlMetadata.getJobState(task.getJobId(), tasks); - if (jobState == JobState.OPENED) { - innerTaskOperation(request, task, listener, state); - } else { - logger.warn("Unexpected job state based on cluster state version [{}]", state.getVersion()); - // Note that DatafeedJob relies on this exception being thrown to detect the state - // conflict and stop the datafeed. If this exception type/status changes, DatafeedJob - // also needs to change. - listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot perform requested action because job [" + - request.getJobId() + "] is not open")); - } - } - - protected abstract void innerTaskOperation(Request request, OperationTask task, ActionListener listener, ClusterState state); - @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java index 8bfbc3d5260..aac88cd7c30 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/UpdateProcessAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -179,7 +178,7 @@ public class UpdateProcessAction extends } } - public static class TransportAction extends TransportJobTaskAction { + public static class TransportAction extends TransportJobTaskAction { @Inject public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, @@ -198,7 +197,7 @@ public class UpdateProcessAction extends } @Override - protected void innerTaskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener, ClusterState state) { + protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener listener) { try { processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(), request.getModelPlotConfig(), e -> { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index b983518dc32..eb3600a1c71 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -156,7 +156,7 @@ public class AutodetectProcessManager extends AbstractComponent { DataLoadParams params, BiConsumer handler) { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { - throw new IllegalStateException("[" + jobId + "] Cannot process data: no active autodetect process for job"); + throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobId + "] is not open"); } communicator.writeToJob(input, xContentType, params, handler); } @@ -174,9 +174,9 @@ public class AutodetectProcessManager extends AbstractComponent { logger.debug("Flushing job {}", jobId); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { - String message = String.format(Locale.ROOT, "[%s] Cannot flush: no active autodetect process for job", jobId); + String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobId); logger.debug(message); - throw new IllegalArgumentException(message); + throw ExceptionsHelper.conflictStatusException(message); } communicator.flushJob(params, (aVoid, e) -> { @@ -194,8 +194,9 @@ public class AutodetectProcessManager extends AbstractComponent { Consumer handler) { AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId); if (communicator == null) { - logger.debug("Cannot update model debug config: no active autodetect process for job {}", jobId); - handler.accept(null); + String message = "Cannot process update model debug config because job [" + jobId + "] is not open"; + logger.debug(message); + handler.accept(ExceptionsHelper.conflictStatusException(message)); return; } communicator.writeUpdateProcessMessage(config, updates, (aVoid, e) -> { diff --git a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index 1119d36da88..f6b5a09ffea 100644 --- a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.integration; -import org.apache.http.HttpHost; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Response; @@ -16,10 +15,8 @@ import org.elasticsearch.xpack.ml.MachineLearning; import java.io.IOException; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentType.JSON; @@ -44,7 +41,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"); assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response)); - assertBusy(this::assertSameClusterStateOnAllNodes); String postData = "{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" + @@ -149,7 +145,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"); assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(Collections.singletonMap("stopped", true), responseEntityToMap(response)); - assertBusy(this::assertSameClusterStateOnAllNodes); response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close", Collections.singletonMap("timeout", "20s")); @@ -170,7 +165,6 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"); assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response)); - assertBusy(this::assertSameClusterStateOnAllNodes); String postData = "{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" + @@ -210,8 +204,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { Collections.singletonMap("timeout", "20s")); assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response)); - assertBusy(this::assertSameClusterStateOnAllNodes); - + // feed some more data points postData = "{\"airline\":\"AAL\",\"responsetime\":\"136.2361\",\"sourcetype\":\"farequote\",\"time\":\"1407081600\"}\n" + @@ -308,28 +301,4 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { return XContentHelper.convertToMap(JSON.xContent(), response.getEntity().getContent(), false); } - // When open job api returns the cluster state on nodes other than master node or node that acted as coordinating node, - // may not have had the latest update with job state set to opened. This may fail subsequent post data, flush, or - // close calls until that node that is running the job task has applied the cluster state where job state has been set to opened. - // this method waits until all nodes in the cluster have the same cluster state version, so that such failures can be - // avoided in tests. Note that the job has been started on the node running the job task (autodetect process is running), - // this is just a workaround for inconsistency in cluster states that may happen for a small amount of time. - private void assertSameClusterStateOnAllNodes(){ - assert getClusterHosts().size() > 1; - Set versions = new HashSet<>(); - for (HttpHost host : getClusterHosts()) { - try { - // Client round robins between cluster hosts: - Response response = client().performRequest("get", "/_cluster/state/version", Collections.singletonMap("local", "true")); - assertEquals(200, response.getStatusLine().getStatusCode()); - int version = (Integer) responseEntityToMap(response).get("version"); - logger.info("Sampled version [{}]", version); - versions.add(version); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - assertEquals(1, versions.size()); - } - }