This commit is contained in:
parent
475752be75
commit
40d3c60d7a
|
@ -10,6 +10,8 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
|
||||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
|
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.common.CheckedConsumer;
|
||||||
|
import org.elasticsearch.common.CheckedRunnable;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||||
|
@ -106,15 +108,15 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
|
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
|
||||||
|
|
||||||
Job.Builder job = createScheduledJob("lookback-job");
|
Job.Builder job = createScheduledJob("lookback-job-datafeed-recreated");
|
||||||
|
|
||||||
String datafeedId = "lookback-datafeed";
|
String datafeedId = "lookback-datafeed-datafeed-recreated";
|
||||||
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
|
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
|
||||||
|
|
||||||
registerJob(job);
|
registerJob(job);
|
||||||
putJob(job);
|
putJob(job);
|
||||||
|
|
||||||
for (int i = 0; i < 2; ++i) {
|
CheckedRunnable<Exception> openAndRunJob = () -> {
|
||||||
openJob(job.getId());
|
openJob(job.getId());
|
||||||
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
||||||
registerDatafeed(datafeedConfig);
|
registerDatafeed(datafeedConfig);
|
||||||
|
@ -129,7 +131,10 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
}, 60, TimeUnit.SECONDS);
|
}, 60, TimeUnit.SECONDS);
|
||||||
deleteDatafeed(datafeedId);
|
deleteDatafeed(datafeedId);
|
||||||
waitUntilJobIsClosed(job.getId());
|
waitUntilJobIsClosed(job.getId());
|
||||||
}
|
};
|
||||||
|
|
||||||
|
openAndRunJob.run();
|
||||||
|
openAndRunJob.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
|
public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
|
||||||
|
@ -140,8 +145,8 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
|
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
|
||||||
|
|
||||||
Job.Builder jobA = createScheduledJob("lookback-job");
|
Job.Builder jobA = createScheduledJob("lookback-job-jobid-updated");
|
||||||
Job.Builder jobB = createScheduledJob("other-lookback-job");
|
Job.Builder jobB = createScheduledJob("other-lookback-job-jobid-updated");
|
||||||
for (Job.Builder job : Arrays.asList(jobA, jobB)) {
|
for (Job.Builder job : Arrays.asList(jobA, jobB)) {
|
||||||
registerJob(job);
|
registerJob(job);
|
||||||
putJob(job);
|
putJob(job);
|
||||||
|
@ -152,11 +157,10 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
registerDatafeed(datafeedConfig);
|
registerDatafeed(datafeedConfig);
|
||||||
putDatafeed(datafeedConfig);
|
putDatafeed(datafeedConfig);
|
||||||
|
|
||||||
for (Job.Builder job : Arrays.asList(jobA, jobB, jobA)) {
|
CheckedConsumer<Job.Builder, Exception> openAndRunJob = job -> {
|
||||||
openJob(job.getId());
|
openJob(job.getId());
|
||||||
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
|
||||||
// Bind datafeedId to the current job on the list, timing stats are wiped out.
|
// Bind datafeedId to the current job on the list, timing stats are wiped out.
|
||||||
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(job.getId()).build());
|
|
||||||
// Datafeed did not do anything yet, hence search_count is equal to 0.
|
// Datafeed did not do anything yet, hence search_count is equal to 0.
|
||||||
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
|
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
|
||||||
startDatafeed(datafeedId, 0L, now.toEpochMilli());
|
startDatafeed(datafeedId, 0L, now.toEpochMilli());
|
||||||
|
@ -166,7 +170,13 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
|
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
|
||||||
}, 60, TimeUnit.SECONDS);
|
}, 60, TimeUnit.SECONDS);
|
||||||
waitUntilJobIsClosed(job.getId());
|
waitUntilJobIsClosed(job.getId());
|
||||||
}
|
};
|
||||||
|
|
||||||
|
openAndRunJob.accept(jobA);
|
||||||
|
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobB.getId()).build()); // wipes out timing stats
|
||||||
|
openAndRunJob.accept(jobB);
|
||||||
|
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobA.getId()).build()); // wipes out timing stats
|
||||||
|
openAndRunJob.accept(jobA);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() throws Exception {
|
public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() throws Exception {
|
||||||
|
@ -177,11 +187,11 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
|
||||||
Instant now = Instant.now();
|
Instant now = Instant.now();
|
||||||
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
|
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
|
||||||
|
|
||||||
Job.Builder job = createScheduledJob("lookback-job");
|
Job.Builder job = createScheduledJob("lookback-job-query-delay-updated");
|
||||||
registerJob(job);
|
registerJob(job);
|
||||||
putJob(job);
|
putJob(job);
|
||||||
|
|
||||||
String datafeedId = "lookback-datafeed";
|
String datafeedId = "lookback-datafeed-query-delay-updated";
|
||||||
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
|
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
|
||||||
registerDatafeed(datafeedConfig);
|
registerDatafeed(datafeedConfig);
|
||||||
putDatafeed(datafeedConfig);
|
putDatafeed(datafeedConfig);
|
||||||
|
|
Loading…
Reference in New Issue