diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java index 12bd9c919c5..0f2c0745950 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/CloseJobAction.java @@ -19,7 +19,6 @@ import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -36,7 +35,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -68,7 +66,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; import java.util.stream.Collectors; public class CloseJobAction extends Action { @@ -465,9 +462,7 @@ public class CloseJobAction extends Action jobIdToPersistentTaskId, Response response, ActionListener listener) { - ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, - request.timeout, logger, threadPool.getThreadContext()); - waitForPersistentTaskStatus(stateObserver, persistentTasksCustomMetaData -> { + persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { for (Map.Entry entry : jobIdToPersistentTaskId.entrySet()) { long persistentTaskId = entry.getValue(); if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { @@ -475,7 +470,7 @@ public class CloseJobAction extends Action() { + }, request.timeout, new ActionListener() { @Override public void onResponse(Boolean result) { Set jobIds = jobIdToPersistentTaskId.keySet(); @@ -501,34 +496,6 @@ public class CloseJobAction extends Action predicate, - ActionListener listener) { - if (predicate.test(stateObserver.setAndGetObservedState().metaData() - .custom(PersistentTasksCustomMetaData.TYPE))) { - listener.onResponse(true); - } else { - stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - listener.onResponse(true); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new IllegalStateException("timed out after " + timeout)); - } - }, clusterState -> predicate - .test(clusterState.metaData() - .custom(PersistentTasksCustomMetaData.TYPE))); - } - } } static List resolveAndValidateJobId(String jobId, ClusterState state) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java index fd73cf25818..b985a3ef7bc 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/StopDatafeedAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -27,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -36,16 +38,25 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasksService; -import org.elasticsearch.xpack.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class StopDatafeedAction extends Action { @@ -94,21 +105,31 @@ public class StopDatafeedAction } private String datafeedId; + private String[] resolvedDatafeedIds; private TimeValue stopTimeout = DEFAULT_TIMEOUT; private boolean force = false; - public Request(String jobId) { - this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName()); + public Request(String datafeedId) { + this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); + this.resolvedDatafeedIds = new String[] { datafeedId }; setActions(StartDatafeedAction.NAME); } Request() { } - public String getDatafeedId() { + private String getDatafeedId() { return datafeedId; } + private String[] getResolvedDatafeedIds() { + return resolvedDatafeedIds; + } + + private void setResolvedDatafeedIds(String[] resolvedDatafeedIds) { + this.resolvedDatafeedIds = resolvedDatafeedIds; + } + public TimeValue getStopTimeout() { return stopTimeout; } @@ -127,8 +148,13 @@ public class StopDatafeedAction @Override public boolean match(Task task) { - String expectedDescription = "datafeed-" + datafeedId; - return task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription()); + for (String id : resolvedDatafeedIds) { + String expectedDescription = "datafeed-" + id; + if (task instanceof StartDatafeedAction.DatafeedTask && expectedDescription.equals(task.getDescription())){ + return true; + } + } + return false; } @Override @@ -140,6 +166,7 @@ public class StopDatafeedAction public void readFrom(StreamInput in) throws IOException { super.readFrom(in); datafeedId = in.readString(); + resolvedDatafeedIds = in.readStringArray(); stopTimeout = new TimeValue(in); force = in.readBoolean(); } @@ -148,6 +175,7 @@ public class StopDatafeedAction public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(datafeedId); + out.writeStringArray(resolvedDatafeedIds); stopTimeout.writeTo(out); out.writeBoolean(force); } @@ -241,33 +269,99 @@ public class StopDatafeedAction MlMetadata mlMetadata = state.getMetaData().custom(MlMetadata.TYPE); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + List resolvedDatafeeds = resolve(request.getDatafeedId(), mlMetadata, tasks); + if (resolvedDatafeeds.isEmpty()) { + listener.onResponse(new Response(true)); + return; + } + request.setResolvedDatafeedIds(resolvedDatafeeds.toArray(new String[resolvedDatafeeds.size()])); + if (request.force) { - PersistentTask datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks); - if (datafeedTask != null) { - forceStopTask(datafeedTask.getId(), listener); - } else { - String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + - "datafeed's task could not be found."; - logger.warn(msg); - listener.onFailure(new RuntimeException(msg)); + final AtomicInteger counter = new AtomicInteger(); + final AtomicArray failures = new AtomicArray<>(resolvedDatafeeds.size()); + + for (String datafeedId : resolvedDatafeeds) { + PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); + if (datafeedTask != null) { + persistentTasksService.cancelPersistentTask(datafeedTask.getId(), new ActionListener>() { + @Override + public void onResponse(PersistentTask persistentTask) { + if (counter.incrementAndGet() == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + listener.onResponse(new Response(true)); + } + @Override + public void onFailure(Exception e) { + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, e); + if (slot == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + } + }); + } else { + String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + + "datafeed's task could not be found."; + logger.warn(msg); + final int slot = counter.incrementAndGet(); + failures.set(slot - 1, new RuntimeException(msg)); + if (slot == resolvedDatafeeds.size()) { + sendResponseOrFailure(request.getDatafeedId(), listener, failures); + } + } } } else { - PersistentTask datafeedTask = validateAndReturnDatafeedTask(request.getDatafeedId(), mlMetadata, tasks); - request.setNodes(datafeedTask.getExecutorNode()); + Set executorNodes = new HashSet<>(); + Map datafeedIdToPersistentTaskId = new HashMap<>(); + + for (String datafeedId : resolvedDatafeeds) { + PersistentTask datafeedTask = validateAndReturnDatafeedTask(datafeedId, mlMetadata, tasks); + executorNodes.add(datafeedTask.getExecutorNode()); + datafeedIdToPersistentTaskId.put(datafeedId, datafeedTask.getId()); + } + ActionListener finalListener = - ActionListener.wrap(r -> waitForDatafeedStopped(datafeedTask.getId(), request, r, listener), listener::onFailure); + ActionListener.wrap(r -> waitForDatafeedStopped(datafeedIdToPersistentTaskId, request, r, listener), listener::onFailure); + + request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); super.doExecute(task, request, finalListener); } } + private void sendResponseOrFailure(String datafeedId, ActionListener listener, + AtomicArray failures) { + List catchedExceptions = failures.asList(); + if (catchedExceptions.size() == 0) { + listener.onResponse(new Response(true)); + return; + } + + String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() + + "] failures, rethrowing last, all Exceptions: [" + + catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + + "]"; + + ElasticsearchException e = new ElasticsearchException(msg, + catchedExceptions.get(0)); + listener.onFailure(e); + } + // Wait for datafeed to be marked as stopped in cluster state, which means the datafeed persistent task has been removed // This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state, // so wait for that to happen here. - void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener listener) { - persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getStopTimeout(), - new WaitForPersistentTaskStatusListener() { + void waitForDatafeedStopped(Map datafeedIdToPersistentTaskId, Request request, Response response, ActionListener listener) { + persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { + for (Map.Entry entry : datafeedIdToPersistentTaskId.entrySet()) { + long persistentTaskId = entry.getValue(); + if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { + return false; + } + } + return true; + }, request.getTimeout(), new ActionListener() { @Override - public void onResponse(PersistentTask task) { + public void onResponse(Boolean result) { listener.onResponse(response); } @@ -278,24 +372,26 @@ public class StopDatafeedAction }); } - private void forceStopTask(long persistentTaskId, ActionListener listener) { - persistentTasksService.cancelPersistentTask(persistentTaskId, new ActionListener>() { - @Override - public void onResponse(PersistentTask persistentTask) { - listener.onResponse(new Response(true)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } - @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { - return TransportJobTaskAction.selectFirst(tasks, taskOperationFailures, failedNodeExceptions); + // number of resolved data feeds should be equal to the number of + // tasks, otherwise something went wrong + if (request.getResolvedDatafeedIds().length != tasks.size()) { + if (taskOperationFailures.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(taskOperationFailures.get(0).getCause()); + } else if (failedNodeExceptions.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(failedNodeExceptions.get(0)); + } else { + throw new IllegalStateException( + "Expected [" + request.getResolvedDatafeedIds().length + + "] number of tasks but " + "got [" + tasks.size() + "]"); + } + } + + return new Response(tasks.stream().allMatch(Response::isStopped)); } @Override @@ -315,6 +411,36 @@ public class StopDatafeedAction } } + static List resolve(String datafeedId, MlMetadata mlMetadata, + PersistentTasksCustomMetaData tasks) { + if (!Job.ALL.equals(datafeedId)) { + return Collections.singletonList(datafeedId); + } + + if (mlMetadata.getDatafeeds().isEmpty()) { + return Collections.emptyList(); + } + + List matched_datafeeds = new ArrayList<>(); + + for (Map.Entry datafeedEntry : mlMetadata.getDatafeeds() + .entrySet()) { + String resolvedDatafeedId = datafeedEntry.getKey(); + DatafeedConfig datafeed = datafeedEntry.getValue(); + + DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), + tasks); + + if (datafeedState == DatafeedState.STOPPED) { + continue; + } + + matched_datafeeds.add(resolvedDatafeedId); + } + + return matched_datafeeds; + } + static PersistentTask validateAndReturnDatafeedTask(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) { DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId); if (datafeed == null) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java index f00702b90b1..7bff400b2ae 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/persistent/PersistentTasksService.java @@ -143,6 +143,32 @@ public class PersistentTasksService extends AbstractComponent { } } + public void waitForPersistentTasksStatus(Predicate predicate, + @Nullable TimeValue timeout, ActionListener listener) { + ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, + logger, threadPool.getThreadContext()); + if (predicate.test(stateObserver.setAndGetObservedState().metaData().custom(PersistentTasksCustomMetaData.TYPE))) { + listener.onResponse(true); + } else { + stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + listener.onResponse(true); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new IllegalStateException("timed out after " + timeout)); + } + }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE))); + } + } + public interface WaitForPersistentTaskStatusListener extends ActionListener> { default void onTimeout(TimeValue timeout) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java index a676fbe0c5d..089c9d15c8f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/StopDatafeedActionRequestTests.java @@ -10,17 +10,22 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.MlMetadata.Builder; import org.elasticsearch.xpack.ml.action.StopDatafeedAction.Request; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.support.AbstractStreamableXContentTestCase; +import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentTaskRequest; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask; +import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedConfig; import static org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests.createDatafeedJob; @@ -88,4 +93,38 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe assertThat(e.getMessage(), equalTo("Cannot stop datafeed [foo] because it has already been stopped")); } + public void testResolveAll() { + Map> taskMap = new HashMap<>(); + Builder mlMetadataBuilder = new MlMetadata.Builder(); + + PersistentTask task = new PersistentTask(1L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_1", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STARTED); + taskMap.put(1L, task); + Job job = BaseMlIntegTestCase.createScheduledJob("job_id_1").build(new Date()); + DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed_1", "job_id_1").build(); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + + task = new PersistentTask(2L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_2", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STOPPED); + taskMap.put(2L, task); + job = BaseMlIntegTestCase.createScheduledJob("job_id_2").build(new Date()); + datafeedConfig = createDatafeedConfig("datafeed_2", "job_id_2").build(); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + + task = new PersistentTask(3L, StartDatafeedAction.NAME, + new StartDatafeedAction.Request("datafeed_3", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); + task = new PersistentTask<>(task, DatafeedState.STARTED); + taskMap.put(3L, task); + job = BaseMlIntegTestCase.createScheduledJob("job_id_3").build(new Date()); + datafeedConfig = createDatafeedConfig("datafeed_3", "job_id_3").build(); + mlMetadataBuilder.putJob(job, false).putDatafeed(datafeedConfig); + + PersistentTasksCustomMetaData tasks = new PersistentTasksCustomMetaData(1L, taskMap); + MlMetadata mlMetadata = mlMetadataBuilder.build(); + + assertEquals(Arrays.asList("datafeed_1", "datafeed_3"), StopDatafeedAction.resolve("_all", mlMetadata, tasks)); + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index 7232cd76b02..f44ce2b2625 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -50,21 +50,30 @@ public class MlRestTestStateCleaner { return; } + try { + int statusCode = adminClient.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e1) { + logger.warn("failed to stop all datafeeds. Forcing stop", e1); + try { + int statusCode = adminClient + .performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop?force=true") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e2) { + logger.warn("Force-closing all data feeds failed", e2); + } + throw new RuntimeException( + "Had to resort to force-stopping datafeeds, something went wrong?", e1); + } + for (Map datafeed : datafeeds) { String datafeedId = (String) datafeed.get("datafeed_id"); - try { - int statusCode = adminClient.performRequest("POST", - "/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode(); - if (statusCode != 200) { - logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId); - } - } catch (Exception e) { - if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { - logger.debug("failed to stop datafeed [" + datafeedId + "]", e); - } else { - logger.warn("failed to stop datafeed [" + datafeedId + "]", e); - } - } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode(); if (statusCode != 200) { logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 6daf2bdcd5a..fe940a81b78 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -249,28 +249,28 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { public static void deleteAllDatafeeds(Logger logger, Client client) throws Exception { MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData(); MlMetadata mlMetadata = metaData.custom(MlMetadata.TYPE); + try { + logger.info("Closing all datafeeds (using _all)"); + StopDatafeedAction.Response stopResponse = client + .execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request("_all")) + .get(); + assertTrue(stopResponse.isStopped()); + } catch (ExecutionException e1) { + try { + StopDatafeedAction.Request request = new StopDatafeedAction.Request("_all"); + request.setForce(true); + StopDatafeedAction.Response stopResponse = client + .execute(StopDatafeedAction.INSTANCE, request).get(); + assertTrue(stopResponse.isStopped()); + } catch (ExecutionException e2) { + logger.warn("Force-stopping datafeed with _all failed.", e2); + } + throw new RuntimeException( + "Had to resort to force-stopping datafeed, something went wrong?", e1); + } + for (DatafeedConfig datafeed : mlMetadata.getDatafeeds().values()) { String datafeedId = datafeed.getId(); - try { - logger.info("Closing datafeed [{}]", datafeedId); - StopDatafeedAction.Response stopResponse = - client.execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).get(); - assertTrue(stopResponse.isStopped()); - } catch (ExecutionException e1) { - if (e1.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { - logger.debug("failed to stop datafeed [" + datafeedId + "], already stopped", e1); - } else { - try { - StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); - request.setForce(true); - StopDatafeedAction.Response stopResponse = client.execute(StopDatafeedAction.INSTANCE, request).get(); - assertTrue(stopResponse.isStopped()); - } catch (Exception e2) { - logger.warn("Force-stopping datafeed [" + datafeedId + "] failed.", e2); - } - throw new RuntimeException("Had to resort to force-stopping datafeed, something went wrong?", e1); - } - } assertBusy(() -> { try { GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); diff --git a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java index f15f407f97b..e9ea5f7f9e9 100644 --- a/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java +++ b/qa/smoke-test-ml-with-security/src/test/java/org/elasticsearch/xpack/ml/integration/MlRestTestStateCleaner.java @@ -50,21 +50,30 @@ public class MlRestTestStateCleaner { return; } + try { + int statusCode = adminClient.performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e1) { + logger.warn("failed to stop all datafeeds. Forcing stop", e1); + try { + int statusCode = adminClient + .performRequest("POST", "/_xpack/ml/datafeeds/_all/_stop?force=true") + .getStatusLine().getStatusCode(); + if (statusCode != 200) { + logger.error("Got status code " + statusCode + " when stopping datafeeds"); + } + } catch (Exception e2) { + logger.warn("Force-closing all data feeds failed", e2); + } + throw new RuntimeException( + "Had to resort to force-stopping datafeeds, something went wrong?", e1); + } + for (Map datafeed : datafeeds) { String datafeedId = (String) datafeed.get("datafeed_id"); - try { - int statusCode = adminClient.performRequest("POST", - "/_xpack/ml/datafeeds/" + datafeedId + "/_stop").getStatusLine().getStatusCode(); - if (statusCode != 200) { - logger.error("Got status code " + statusCode + " when stopping datafeed " + datafeedId); - } - } catch (Exception e) { - if (e.getMessage().contains("Cannot stop datafeed [" + datafeedId + "] because it has already been stopped")) { - logger.debug("failed to stop datafeed [" + datafeedId + "]", e); - } else { - logger.warn("failed to stop datafeed [" + datafeedId + "]", e); - } - } int statusCode = adminClient.performRequest("DELETE", "/_xpack/ml/datafeeds/" + datafeedId).getStatusLine().getStatusCode(); if (statusCode != 200) { logger.error("Got status code " + statusCode + " when deleting datafeed " + datafeedId);