[ML] Handle simultaneous force delete datafeed and stop datafeed (elastic/x-pack-elasticsearch#2243)
This is an important case as the UI force stops datafeeds now. Fixes elastic/x-pack-kibana#2083 Original commit: elastic/x-pack-elasticsearch@4d0f62ad2d
This commit is contained in:
parent
3ad2f5e9f5
commit
b5d159bc1c
|
@ -145,7 +145,10 @@ public class DatafeedManager extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void isolateDatafeed(long allocationId) {
|
||||
runningDatafeedsOnThisNode.get(allocationId).isolateDatafeed();
|
||||
Holder holder = runningDatafeedsOnThisNode.get(allocationId);
|
||||
if (holder != null) {
|
||||
holder.isolateDatafeed();
|
||||
}
|
||||
}
|
||||
|
||||
// Important: Holder must be created and assigned to DatafeedTask before setting state to started,
|
||||
|
|
|
@ -5,12 +5,13 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.integration;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
|
||||
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
|
||||
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
|
||||
|
@ -25,6 +26,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
|
||||
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
|
||||
|
@ -52,7 +54,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
|||
client().admin().indices().prepareCreate("data-2")
|
||||
.addMapping("type", "time", "type=date")
|
||||
.get();
|
||||
ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get();
|
||||
client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get();
|
||||
long numDocs2 = randomIntBetween(32, 2048);
|
||||
indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now);
|
||||
|
||||
|
@ -62,9 +64,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
|||
assertTrue(putJobResponse.isAcknowledged());
|
||||
assertThat(putJobResponse.getResponse().getJobVersion(), equalTo(Version.CURRENT));
|
||||
openJob(job.getId());
|
||||
assertBusy(() -> {
|
||||
assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED);
|
||||
});
|
||||
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
||||
|
||||
List<String> t = new ArrayList<>(2);
|
||||
t.add("data-1");
|
||||
|
@ -155,6 +155,49 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRealtime_givenSimultaneousStopAndForceDelete() throws Throwable {
|
||||
String jobId = "realtime-job-stop-and-force-delete";
|
||||
final String datafeedId = jobId + "-datafeed";
|
||||
startRealtime(jobId);
|
||||
|
||||
AtomicReference<Throwable> exception = new AtomicReference<>();
|
||||
|
||||
// The UI now force deletes datafeeds, which means they can be deleted while running.
|
||||
// The first step is to isolate the datafeed. But if it was already being stopped then
|
||||
// the datafeed may not be running by the time the isolate action is executed. This
|
||||
// test will sometimes (depending on thread scheduling) achieve this situation and ensure
|
||||
// the code is robust to it.
|
||||
Thread deleteDatafeedThread = new Thread(() -> {
|
||||
try {
|
||||
DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId);
|
||||
request.setForce(true);
|
||||
DeleteDatafeedAction.Response response = client().execute(DeleteDatafeedAction.INSTANCE, request).actionGet();
|
||||
if (response.isAcknowledged()) {
|
||||
GetDatafeedsStatsAction.Request statsRequest = new GetDatafeedsStatsAction.Request(datafeedId);
|
||||
expectThrows(ResourceNotFoundException.class,
|
||||
() -> client().execute(GetDatafeedsStatsAction.INSTANCE, statsRequest).actionGet());
|
||||
} else {
|
||||
exception.set(new AssertionError("Job is not deleted"));
|
||||
}
|
||||
} catch (AssertionError | Exception e) {
|
||||
exception.set(e);
|
||||
}
|
||||
});
|
||||
deleteDatafeedThread.start();
|
||||
|
||||
try {
|
||||
stopDatafeed(datafeedId);
|
||||
} catch (ResourceNotFoundException e) {
|
||||
// This is OK - it means the thread running the delete fully completed before the stop started to execute
|
||||
} finally {
|
||||
deleteDatafeedThread.join();
|
||||
}
|
||||
|
||||
if (exception.get() != null) {
|
||||
throw exception.get();
|
||||
}
|
||||
}
|
||||
|
||||
private void startRealtime(String jobId) throws Exception {
|
||||
client().admin().indices().prepareCreate("data")
|
||||
.addMapping("type", "time", "type=date")
|
||||
|
@ -168,9 +211,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
|||
registerJob(job);
|
||||
assertTrue(putJob(job).isAcknowledged());
|
||||
openJob(job.getId());
|
||||
assertBusy(() -> {
|
||||
assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED);
|
||||
});
|
||||
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
||||
|
||||
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
|
||||
registerDatafeed(datafeedConfig);
|
||||
|
|
Loading…
Reference in New Issue