From b5d159bc1c83756b1a0f4d325041da2b2b6f6ab3 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 11 Aug 2017 14:43:24 +0100 Subject: [PATCH] [ML] Handle simultaneous force delete datafeed and stop datafeed (elastic/x-pack-elasticsearch#2243) This is an important case as the UI force stops datafeeds now. Fixes elastic/x-pack-kibana#2083 Original commit: elastic/x-pack-elasticsearch@4d0f62ad2d78aa6e8667e691feb72f12ca8e87fe --- .../xpack/ml/datafeed/DatafeedManager.java | 5 +- .../xpack/ml/integration/DatafeedJobsIT.java | 57 ++++++++++++++++--- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 173e02d4fa2..56aef639e4c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -145,7 +145,10 @@ public class DatafeedManager extends AbstractComponent { } public void isolateDatafeed(long allocationId) { - runningDatafeedsOnThisNode.get(allocationId).isolateDatafeed(); + Holder holder = runningDatafeedsOnThisNode.get(allocationId); + if (holder != null) { + holder.isolateDatafeed(); + } } // Important: Holder must be created and assigned to DatafeedTask before setting state to started, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index fe55243b1b5..d97eb709551 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -5,12 +5,13 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; +import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; @@ -25,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; @@ -52,7 +54,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { client().admin().indices().prepareCreate("data-2") .addMapping("type", "time", "type=date") .get(); - ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get(); + client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get(); long numDocs2 = randomIntBetween(32, 2048); indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now); @@ -62,9 +64,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { assertTrue(putJobResponse.isAcknowledged()); assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT)); openJob(job.getId()); - assertBusy(() -> { - assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED); - }); + assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); List t = new ArrayList<>(2); t.add("data-1"); @@ -155,6 +155,49 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { } } + public void testRealtime_givenSimultaneousStopAndForceDelete() throws Throwable { + String jobId = "realtime-job-stop-and-force-delete"; + final String datafeedId = jobId + "-datafeed"; + startRealtime(jobId); + + AtomicReference exception = new AtomicReference<>(); + + // The UI now force deletes datafeeds, which means they can be deleted while running. + // The first step is to isolate the datafeed. But if it was already being stopped then + // the datafeed may not be running by the time the isolate action is executed. This + // test will sometimes (depending on thread scheduling) achieve this situation and ensure + // the code is robust to it. + Thread deleteDatafeedThread = new Thread(() -> { + try { + DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId); + request.setForce(true); + DeleteDatafeedAction.Response response = client().execute(DeleteDatafeedAction.INSTANCE, request).actionGet(); + if (response.isAcknowledged()) { + GetDatafeedsStatsAction.Request statsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + expectThrows(ResourceNotFoundException.class, + () -> client().execute(GetDatafeedsStatsAction.INSTANCE, statsRequest).actionGet()); + } else { + exception.set(new AssertionError("Job is not deleted")); + } + } catch (AssertionError | Exception e) { + exception.set(e); + } + }); + deleteDatafeedThread.start(); + + try { + stopDatafeed(datafeedId); + } catch (ResourceNotFoundException e) { + // This is OK - it means the thread running the delete fully completed before the stop started to execute + } finally { + deleteDatafeedThread.join(); + } + + if (exception.get() != null) { + throw exception.get(); + } + } + private void startRealtime(String jobId) throws Exception { client().admin().indices().prepareCreate("data") .addMapping("type", "time", "type=date") @@ -168,9 +211,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { registerJob(job); assertTrue(putJob(job).isAcknowledged()); openJob(job.getId()); - assertBusy(() -> { - assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED); - }); + assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data")); registerDatafeed(datafeedConfig);