diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index 381932443cc..c30a51c2d39 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.ml.transforms.date.DateTransform; import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform; public abstract class AbstractDataToProcessWriter implements DataToProcessWriter { + protected static final int TIME_FIELD_OUT_INDEX = 0; private static final int MS_IN_SECOND = 1000; diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java index a1558a7adb4..d6af67da45b 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/scheduler/extractor/scroll/ScrollDataExtractor.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollAction; -import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; @@ -105,7 +104,6 @@ class ScrollDataExtractor implements DataExtractor { private SearchRequestBuilder buildSearchRequest() { SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) .setScroll(SCROLL_TIMEOUT) - .setSearchType(SearchType.QUERY_AND_FETCH) .addSort(context.timeField, SortOrder.ASC) .setIndices(context.indexes) .setTypes(context.types) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java index 2be79e242a4..0bdc818f608 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/ScheduledJobsIT.java @@ -65,13 +65,20 @@ public class ScheduledJobsIT extends ESIntegTestCase { } public void testLookbackOnly() throws Exception { - client().admin().indices().prepareCreate("data") + client().admin().indices().prepareCreate("data-1") .addMapping("type", "time", "type=date") .get(); long numDocs = randomIntBetween(32, 2048); long now = System.currentTimeMillis(); - long lastWeek = now - 604800000; - indexDocs(numDocs, lastWeek, now); + long oneWeekAgo = now - 604800000; + long twoWeeksAgo = oneWeekAgo - 604800000; + indexDocs("data-1", numDocs, twoWeeksAgo, oneWeekAgo); + + client().admin().indices().prepareCreate("data-2") + .addMapping("type", "time", "type=date") + .get(); + long numDocs2 = randomIntBetween(32, 2048); + indexDocs("data-2", numDocs2, oneWeekAgo, now); Job.Builder job = createJob(); PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId())); @@ -90,7 +97,8 @@ public class ScheduledJobsIT extends ESIntegTestCase { client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get(); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs)); + assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs + numDocs2)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get() .getState().metaData().custom(MlMetadata.TYPE); @@ -105,7 +113,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { long numDocs1 = randomIntBetween(32, 2048); long now = System.currentTimeMillis(); long lastWeek = System.currentTimeMillis() - 604800000; - indexDocs(numDocs1, lastWeek, now); + indexDocs("data", numDocs1, lastWeek, now); Job.Builder job = createJob(); PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId())); @@ -132,14 +140,16 @@ public class ScheduledJobsIT extends ESIntegTestCase { assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }); long numDocs2 = randomIntBetween(2, 64); now = System.currentTimeMillis(); - indexDocs(numDocs2, now + 5000, now + 6000); + indexDocs("data", numDocs2, now + 5000, now + 6000); assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1 + numDocs2)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }, 30, TimeUnit.SECONDS); StopSchedulerAction.Request stopSchedulerRequest = new StopSchedulerAction.Request(schedulerConfig.getId()); @@ -153,12 +163,12 @@ public class ScheduledJobsIT extends ESIntegTestCase { assertThat(errorHolder.get(), nullValue()); } - private void indexDocs(long numDocs, long start, long end) { + private void indexDocs(String index, long numDocs, long start, long end) { int maxIncrement = (int) ((end - start) / numDocs); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); long timestamp = start; for (int i = 0; i < numDocs; i++) { - IndexRequest indexRequest = new IndexRequest("data", "type"); + IndexRequest indexRequest = new IndexRequest(index, "type"); indexRequest.source("time", timestamp); bulkRequestBuilder.add(indexRequest); timestamp += randomIntBetween(1, maxIncrement); @@ -190,7 +200,7 @@ public class ScheduledJobsIT extends ESIntegTestCase { SchedulerConfig.Builder builder = new SchedulerConfig.Builder(schedulerId, jobId); builder.setQueryDelay(1); builder.setFrequency(2); - builder.setIndexes(Collections.singletonList("data")); + builder.setIndexes(Collections.singletonList("data-*")); builder.setTypes(Collections.singletonList("type")); return builder.build(); }