From 7ef9a16f45819f0b5f98de8a32fe5612529290b0 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 11 Apr 2017 13:39:22 +0200 Subject: [PATCH] [ML] implement '_all' for stopping datafeeds (elastic/x-pack-elasticsearch#995) Add a '_all' functionality for stopping ML datafeeds. For cluster shutdown due to maintenance and major upgrades we recommend the user to stop all datafeeds and jobs. This change add the ability to stop all datafeeds at once where previously it was required to iterate over all feeds and do a explicit stop. This is part two of elastic/x-pack-elasticsearch#795, part one can be found in elastic/x-pack-elasticsearch#962 . relates elastic/x-pack-elasticsearch#795 Original commit: elastic/x-pack-elasticsearch@ed1eff83d55e42f285d72dab5f62acfd6a443ea5 --- .../xpack/ml/action/CloseJobAction.java | 37 +--- .../xpack/ml/action/StopDatafeedAction.java | 198 ++++++++++++++---- .../persistent/PersistentTasksService.java | 26 +++ .../StopDatafeedActionRequestTests.java | 39 ++++ .../integration/MlRestTestStateCleaner.java | 35 ++-- .../xpack/ml/support/BaseMlIntegTestCase.java | 40 ++-- .../integration/MlRestTestStateCleaner.java | 35 ++-- 7 files changed, 293 insertions(+), 117 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 12bd9c919c5..0f2c0745950 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -19,7 +19,6 @@ import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -36,7 +35,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -68,7 +66,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; import java.util.stream.Collectors; public class CloseJobAction extends Action { @@ -465,9 +462,7 @@ public class CloseJobAction extends Action jobIdToPersistentTaskId, Response response, ActionListener listener) { - ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, - request.timeout, logger, threadPool.getThreadContext()); - waitForPersistentTaskStatus(stateObserver, persistentTasksCustomMetaData -> { + persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { for (Map.Entry entry : jobIdToPersistentTaskId.entrySet()) { long persistentTaskId = entry.getValue(); if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { @@ -475,7 +470,7 @@ public class CloseJobAction extends Action() { + }, request.timeout, new ActionListener() { @Override public void onResponse(Boolean result) { Set jobIds = jobIdToPersistentTaskId.keySet(); @@ -501,34 +496,6 @@ public class CloseJobAction extends Action predicate, - ActionListener listener) { - if (predicate.test(stateObserver.setAndGetObservedState().metaData() - .custom(PersistentTasksCustomMetaData.TYPE))) { - listener.onResponse(true); - } else { - stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - listener.onResponse(true); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new IllegalStateException("timed out after " + timeout)); - } - }, clusterState -> predicate - .test(clusterState.metaData() - .custom(PersistentTasksCustomMetaData.TYPE))); - } - } } static List resolveAndValidateJobId(String jobId, ClusterState state) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index fd73cf25818..b985a3ef7bc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -27,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -36,16 +38,25 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class StopDatafeedAction extends Action { @@ -94,21 +105,31 @@ public class StopDatafeedAction } private String datafeedId; + private String[] resolvedDatafeedIds; private TimeValue stopTimeout = DEFAULT_TIMEOUT; private boolean force = false; - public Request(String jobId) { - this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); + public Request(String datafeedId) { + this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); + this.resolvedDatafeedIds = new String[] { datafeedId }; setActions(StartDatafeedAction.NAME); } Request() { } - public String getDatafeedId() { + private String getDatafeedId() { return datafeedId; } + private String[] getResolvedDatafeedIds() { + return resolvedDatafeedIds; + } + + private void setResolvedDatafeedIds(String[] resolvedDatafeedIds) { + this.resolvedDatafeedIds = resolvedDatafeedIds; + } + public TimeValue getStopTimeout() { return stopTimeout; } @@ -127,8 +148,13 @@ public class StopDatafeedAction @Override public boolean match(Task task) { - String expectedDescription = "datafeed-" + datafeedId; - return task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription()); + for (String id : resolvedDatafeedIds) { + String expectedDescription = "datafeed-" + id; + if (task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription())){ + return true; + } + } + return false; } @Override @@ -140,6 +166,7 @@ public class StopDatafeedAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); + resolvedDatafeedIds = in.readStringArray(); stopTimeout = new TimeValue(in); force = in.readBoolean(); } @@ -148,6 +175,7 @@ public class StopDatafeedAction public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(datafeedId); + out.writeStringArray(resolvedDatafeedIds); stopTimeout.writeTo(out); out.writeBoolean(force); } @@ -241,33 +269,99 @@ public class StopDatafeedAction MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + List resolvedDatafeeds = resolve(request.getDatafeedId(), mlMetadata, tasks); + if (resolvedDatafeeds.isEmpty()) { + listener.onResponse(new Response(true)); + return; + } + request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()])); + if (request.force) { - PersistentTask datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); - if (datafeedTask != null) { - forceStopTask(datafeedTask.getId(), listener); - } else { - String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + - "datafeed's task could not be found."; - logger.warn(msg); - listener.onFailure(new RuntimeException(msg)); + final AtomicInteger counter = new AtomicInteger(); + final AtomicArray failures = new AtomicArray<>(resolvedDatafeeds.size()); + + for (String datafeedId : resolvedDatafeeds) { + PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); + if (datafeedTask != null) { + persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener>() { + @Override + public void onResponse(PersistentTask persistentTask) { + if (counter.incrementAndGet() == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + listener.onResponse(new Response(true)); + } + @Override + public void onFailure(Exception e) { + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, e); + if (slot == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + } + }); + } else { + String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + + "datafeed's task could not be found."; + logger.warn(msg); + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, new RuntimeException(msg)); + if (slot == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + } } } else { - PersistentTask datafeedTask = validateAndReturnDatafeedTask(request.getDatafeedId(), mlMetadata, tasks); - request.setNodes(datafeedTask.getExecutorNode()); + Set executorNodes = new HashSet<>(); + Map datafeedIdToPersistentTaskId = new HashMap<>(); + + for (String datafeedId : resolvedDatafeeds) { + PersistentTask datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks); + executorNodes.add(datafeedTask.getExecutorNode()); + datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId()); + } + ActionListener finalListener = - ActionListener.wrap(r -> waitForDatafeedStopped(datafeedTask.getId(), request, r, listener), listener::onFailure); + ActionListener.wrap(r -> waitForDatafeedStopped(datafeedIdToPersistentTaskId, request, r, listener), listener::onFailure); + + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); super.doExecute(task, request, finalListener); } } + private void sendResponseOrFailure(String datafeedId, ActionListener listener, + AtomicArray failures) { + List catchedExceptions = failures.asList(); + if (catchedExceptions.size() == 0) { + listener.onResponse(new Response(true)); + return; + } + + String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() + + "] failures, rethrowing last, all Exceptions: [" + + catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + + "]"; + + ElasticsearchException e = new ElasticsearchException(msg, + catchedExceptions.get(0)); + listener.onFailure(e); + } + // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, // so wait for that to happen here. - void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener listener) { - persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getStopTimeout(), - new WaitForPersistentTaskStatusListener() { + void waitForDatafeedStopped(Map datafeedIdToPersistentTaskId, Request request, Response response, ActionListener listener) { + persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { + for (Map.Entry entry : datafeedIdToPersistentTaskId.entrySet()) { + long persistentTaskId = entry.getValue(); + if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { + return false; + } + } + return true; + }, request.getTimeout(), new ActionListener() { @Override - public void onResponse(PersistentTask task) { + public void onResponse(Boolean result) { listener.onResponse(response); } @@ -278,24 +372,26 @@ public class StopDatafeedAction }); } - private void forceStopTask(long persistentTaskId, ActionListener listener) { - persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener>() { - @Override - public void onResponse(PersistentTask persistentTask) { - listener.onResponse(new Response(true)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } - @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { - return TransportJobTaskAction.selectFirst(tasks, taskOperationFailures, failedNodeExceptions); + // number of resolved data feeds should be equal to the number of + // tasks, otherwise something went wrong + if (request.getResolvedDatafeedIds().length != tasks.size()) { + if (taskOperationFailures.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(taskOperationFailures.get(0).getCause()); + } else if (failedNodeExceptions.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(failedNodeExceptions.get(0)); + } else { + throw new IllegalStateException( + "Expected [" + request.getResolvedDatafeedIds().length + + "] number of tasks but " + "got [" + tasks.size() + "]"); + } + } + + return new Response(tasks.stream().allMatch(Response::isStopped)); } @Override @@ -315,6 +411,36 @@ public class StopDatafeedAction } } + static List resolve(String datafeedId, MlMetadata mlMetadata, + PersistentTasksCustomMetaData tasks) { + if (!Job.ALL.equals(datafeedId)) { + return Collections.singletonList(datafeedId); + } + + if (mlMetadata.getDatafeeds().isEmpty()) { + return Collections.emptyList(); + } + + List matched_datafeeds = new ArrayList<>(); + + for (Map.Entry datafeedEntry : mlMetadata.getDatafeeds() + .entrySet()) { + String resolvedDatafeedId = datafeedEntry.getKey(); + DatafeedConfig datafeed = datafeedEntry.getValue(); + + DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), + tasks); + + if (datafeedState == DatafeedState.STOPPED) { + continue; + } + + matched_datafeeds.add(resolvedDatafeedId); + } + + return matched_datafeeds; + } + static PersistentTask validateAndReturnDatafeedTask(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index f00702b90b1..7bff400b2ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -143,6 +143,32 @@ public class PersistentTasksService extends AbstractComponent { } } + public void waitForPersistentTasksStatus(Predicate predicate, + @Nullable TimeValue timeout, ActionListener listener) { + ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, + logger, threadPool.getThreadContext()); + if (predicate.test(stateObserver.setAndGetObservedState().metaData().custom(PersistentTasksCustomMetaData.TYPE))) { + listener.onResponse(true); + } else { + stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + listener.onResponse(true); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new IllegalStateException("timed out after " + timeout)); + } + }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE))); + } + } + public interface WaitForPersistentTaskStatusListener extends ActionListener> { default void onTimeout(TimeValue timeout) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index a676fbe0c5d..089c9d15c8f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -10,17 +10,22 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.MlMetadata.Builder; import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; @@ -88,4 +93,38 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe assertThat(e.getMessage(), equalTo("Cannot stop datafeed [foo] because it has already been stopped")); } + public void testResolveAll() { + Map> taskMap = new HashMap<>(); + Builder mlMetadataBuilder = new MlMetadata.Builder(); + + PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_1", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STARTED); + taskMap.put(1L, task); + Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + + task = new PersistentTask(2L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STOPPED); + taskMap.put(2L, task); + job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); + datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + + task = new PersistentTask(3L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_3", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STARTED); + taskMap.put(3L, task); + job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); + datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, taskMap); + MlMetadata mlMetadata = mlMetadataBuilder.build(); + + assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks)); + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index 7232cd76b02..f44ce2b2625 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -50,21 +50,30 @@ public class MlRestTestStateCleaner { return; } + try { + int statusCode = adminClient.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e1) { + logger.warn("failed to stop all datafeeds. Forcing stop", e1); + try { + int statusCode = adminClient + .performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop?force=true") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e2) { + logger.warn("Force-closing all data feeds failed", e2); + } + throw new RuntimeException( + "Had to resort to force-stopping datafeeds, something went wrong?", e1); + } + for (Map datafeed : datafeeds) { String datafeedId = (String) datafeed.get("datafeed_id"); - try { - int statusCode = adminClient.performRequest("POST", - "/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode(); - if (statusCode != 200) { - logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId); - } - } catch (Exception e) { - if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { - logger.debug("failed to stop datafeed [" + datafeedId + "]", e); - } else { - logger.warn("failed to stop datafeed [" + datafeedId + "]", e); - } - } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode(); if (statusCode != 200) { logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 6daf2bdcd5a..fe940a81b78 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -249,28 +249,28 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception { MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); + try { + logger.info("Closing all datafeeds (using _all)"); + StopDatafeedAction.Response stopResponse = client + .execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request("_all")) + .get(); + assertTrue(stopResponse.isStopped()); + } catch (ExecutionException e1) { + try { + StopDatafeedAction.Request request = new StopDatafeedAction.Request("_all"); + request.setForce(true); + StopDatafeedAction.Response stopResponse = client + .execute(StopDatafeedAction.INSTANCE, request).get(); + assertTrue(stopResponse.isStopped()); + } catch (ExecutionException e2) { + logger.warn("Force-stopping datafeed with _all failed.", e2); + } + throw new RuntimeException( + "Had to resort to force-stopping datafeed, something went wrong?", e1); + } + for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) { String datafeedId = datafeed.getId(); - try { - logger.info("Closing datafeed [{}]", datafeedId); - StopDatafeedAction.Response stopResponse = - client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get(); - assertTrue(stopResponse.isStopped()); - } catch (ExecutionException e1) { - if (e1.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { - logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e1); - } else { - try { - StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); - request.setForce(true); - StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get(); - assertTrue(stopResponse.isStopped()); - } catch (Exception e2) { - logger.warn("Force-stopping datafeed [" + datafeedId + "] failed.", e2); - } - throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?", e1); - } - } assertBusy(() -> { try { GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); diff --git a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index f15f407f97b..e9ea5f7f9e9 100644 --- a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -50,21 +50,30 @@ public class MlRestTestStateCleaner { return; } + try { + int statusCode = adminClient.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e1) { + logger.warn("failed to stop all datafeeds. Forcing stop", e1); + try { + int statusCode = adminClient + .performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop?force=true") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e2) { + logger.warn("Force-closing all data feeds failed", e2); + } + throw new RuntimeException( + "Had to resort to force-stopping datafeeds, something went wrong?", e1); + } + for (Map datafeed : datafeeds) { String datafeedId = (String) datafeed.get("datafeed_id"); - try { - int statusCode = adminClient.performRequest("POST", - "/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode(); - if (statusCode != 200) { - logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId); - } - } catch (Exception e) { - if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { - logger.debug("failed to stop datafeed [" + datafeedId + "]", e); - } else { - logger.warn("failed to stop datafeed [" + datafeedId + "]", e); - } - } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode(); if (statusCode != 200) { logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId);