diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index 9b6523eb73c..8953c65d15b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -10,6 +10,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; @@ -106,15 +108,15 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { Instant now = Instant.now(); indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli()); - Job.Builder job = createScheduledJob("lookback-job"); + Job.Builder job = createScheduledJob("lookback-job-datafeed-recreated"); - String datafeedId = "lookback-datafeed"; + String datafeedId = "lookback-datafeed-datafeed-recreated"; DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data")); registerJob(job); putJob(job); - for (int i = 0; i < 2; ++i) { + CheckedRunnable openAndRunJob = () -> { openJob(job.getId()); assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); registerDatafeed(datafeedConfig); @@ -129,7 +131,10 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { }, 60, TimeUnit.SECONDS); deleteDatafeed(datafeedId); waitUntilJobIsClosed(job.getId()); - } + }; + + openAndRunJob.run(); + openAndRunJob.run(); } public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception { @@ -140,8 +145,8 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { Instant now = Instant.now(); indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli()); - Job.Builder jobA = createScheduledJob("lookback-job"); - Job.Builder jobB = createScheduledJob("other-lookback-job"); + Job.Builder jobA = createScheduledJob("lookback-job-jobid-updated"); + Job.Builder jobB = createScheduledJob("other-lookback-job-jobid-updated"); for (Job.Builder job : Arrays.asList(jobA, jobB)) { registerJob(job); putJob(job); @@ -152,11 +157,10 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { registerDatafeed(datafeedConfig); putDatafeed(datafeedConfig); - for (Job.Builder job : Arrays.asList(jobA, jobB, jobA)) { + CheckedConsumer openAndRunJob = job -> { openJob(job.getId()); assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); // Bind datafeedId to the current job on the list, timing stats are wiped out. - updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(job.getId()).build()); // Datafeed did not do anything yet, hence search_count is equal to 0. assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L)); startDatafeed(datafeedId, 0L, now.toEpochMilli()); @@ -166,7 +170,13 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L)); }, 60, TimeUnit.SECONDS); waitUntilJobIsClosed(job.getId()); - } + }; + + openAndRunJob.accept(jobA); + updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobB.getId()).build()); // wipes out timing stats + openAndRunJob.accept(jobB); + updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobA.getId()).build()); // wipes out timing stats + openAndRunJob.accept(jobA); } public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() throws Exception { @@ -177,11 +187,11 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { Instant now = Instant.now(); indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli()); - Job.Builder job = createScheduledJob("lookback-job"); + Job.Builder job = createScheduledJob("lookback-job-query-delay-updated"); registerJob(job); putJob(job); - String datafeedId = "lookback-datafeed"; + String datafeedId = "lookback-datafeed-query-delay-updated"; DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data")); registerDatafeed(datafeedConfig); putDatafeed(datafeedConfig);