Use QUERY_THEN_FETCH (default) as search type for data extractor (elastic/elasticsearch#704)

I thought QUERY_AND_FETCH was the most efficient for the data extractor
but it does not work with sorting. It causes all shard results to be
returned before sorting and thus we may get out-of-order errors.

This commit switches to the default search type.

Original commit: elastic/x-pack-elasticsearch@d8a8155973
This commit is contained in:
Dimitris Athanasiou 2017-01-12 13:36:09 +00:00 committed by GitHub
parent af7edd84bb
commit b93ec686f3
3 changed files with 20 additions and 11 deletions

View File

@ -44,6 +44,7 @@ import org.elasticsearch.xpack.ml.transforms.date.DateTransform;
import org.elasticsearch.xpack.ml.transforms.date.DoubleDateTransform;
public abstract class AbstractDataToProcessWriter implements DataToProcessWriter {
protected static final int TIME_FIELD_OUT_INDEX = 0;
private static final int MS_IN_SECOND = 1000;

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
@ -105,7 +104,6 @@ class ScrollDataExtractor implements DataExtractor {
private SearchRequestBuilder buildSearchRequest() {
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
.setScroll(SCROLL_TIMEOUT)
.setSearchType(SearchType.QUERY_AND_FETCH)
.addSort(context.timeField, SortOrder.ASC)
.setIndices(context.indexes)
.setTypes(context.types)

View File

@ -65,13 +65,20 @@ public class ScheduledJobsIT extends ESIntegTestCase {
}
public void testLookbackOnly() throws Exception {
client().admin().indices().prepareCreate("data")
client().admin().indices().prepareCreate("data-1")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long lastWeek = now - 604800000;
indexDocs(numDocs, lastWeek, now);
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs("data-1", numDocs, twoWeeksAgo, oneWeekAgo);
client().admin().indices().prepareCreate("data-2")
.addMapping("type", "time", "type=date")
.get();
long numDocs2 = randomIntBetween(32, 2048);
indexDocs("data-2", numDocs2, oneWeekAgo, now);
Job.Builder job = createJob();
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
@ -90,7 +97,8 @@ public class ScheduledJobsIT extends ESIntegTestCase {
client().execute(StartSchedulerAction.INSTANCE, startSchedulerRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs));
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
MlMetadata mlMetadata = client().admin().cluster().prepareState().all().get()
.getState().metaData().custom(MlMetadata.TYPE);
@ -105,7 +113,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
long numDocs1 = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long lastWeek = System.currentTimeMillis() - 604800000;
indexDocs(numDocs1, lastWeek, now);
indexDocs("data", numDocs1, lastWeek, now);
Job.Builder job = createJob();
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true, job.getId()));
@ -132,14 +140,16 @@ public class ScheduledJobsIT extends ESIntegTestCase {
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});
long numDocs2 = randomIntBetween(2, 64);
now = System.currentTimeMillis();
indexDocs(numDocs2, now + 5000, now + 6000);
indexDocs("data", numDocs2, now + 5000, now + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getInputRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
}, 30, TimeUnit.SECONDS);
StopSchedulerAction.Request stopSchedulerRequest = new StopSchedulerAction.Request(schedulerConfig.getId());
@ -153,12 +163,12 @@ public class ScheduledJobsIT extends ESIntegTestCase {
assertThat(errorHolder.get(), nullValue());
}
private void indexDocs(long numDocs, long start, long end) {
private void indexDocs(String index, long numDocs, long start, long end) {
int maxIncrement = (int) ((end - start) / numDocs);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
long timestamp = start;
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest("data", "type");
IndexRequest indexRequest = new IndexRequest(index, "type");
indexRequest.source("time", timestamp);
bulkRequestBuilder.add(indexRequest);
timestamp += randomIntBetween(1, maxIncrement);
@ -190,7 +200,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(schedulerId, jobId);
builder.setQueryDelay(1);
builder.setFrequency(2);
builder.setIndexes(Collections.singletonList("data"));
builder.setIndexes(Collections.singletonList("data-*"));
builder.setTypes(Collections.singletonList("type"));
return builder.build();
}