[ML] Fix query of all buckets with expand=true (elastic/x-pack-elasticsearch#932)

Previously if you queried all buckets with the expand=true option
you'd get an error as the code would try to do a search with
from=10000&size=10000.  This PR fixes this problem.

Followup for elastic/x-pack-elasticsearch#832

Original commit: elastic/x-pack-elasticsearch@2a5ca0998c
This commit is contained in:
David Roberts 2017-04-03 14:15:35 +01:00 committed by GitHub
parent 51662c83eb
commit c0c0818d87
2 changed files with 10 additions and 13 deletions

View File

@ -458,14 +458,14 @@ public class JobProvider {
if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler, client);
expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
return;
}
} else {
if (query.isExpand()) {
Iterator<Bucket> bucketsToExpand = buckets.results().stream()
.filter(bucket -> bucket.getRecordCount() > 0).iterator();
expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler, client);
expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
return;
}
}
@ -474,12 +474,12 @@ public class JobProvider {
}
private void expandBuckets(String jobId, BucketsQuery query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand,
int from, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler, Client client) {
Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler, Client client) {
if (bucketsToExpand.hasNext()) {
Consumer<Integer> c = i -> {
expandBuckets(jobId, query, buckets, bucketsToExpand, from + RECORDS_SIZE_PARAM, handler, errorHandler, client);
expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
};
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), from, c, errorHandler, client);
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), c, errorHandler, client);
} else {
handler.accept(buckets);
}
@ -548,13 +548,10 @@ public class JobProvider {
/**
* Expand a bucket with its records
*/
// TODO: Ensure all records are included in expanded bucket (see x-pack-elasticsearch#833)
// This now gets the first 10K records for a bucket. The rate of records per bucket
// is controlled by parameter in the c++ process and its default value is 500. Users may
// change that. If they change it they could arguably also change the soft limit for the
// search size. However, ideally we should change this to use async scroll if the total
// records is more than 10K.
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from,
// change that. Issue elastic/machine-learning-cpp#73 is open to prevent this.
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue,
Consumer<Integer> consumer, Consumer<Exception> errorHandler, Client client) {
Consumer<QueryPage<AnomalyRecord>> h = page -> {
bucket.getRecords().addAll(page.results());
@ -563,7 +560,7 @@ public class JobProvider {
}
consumer.accept(bucket.getRecords().size());
};
bucketRecords(jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(),
bucketRecords(jobId, bucket, 0, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(),
false, partitionFieldValue, h, errorHandler, client);
}

View File

@ -581,7 +581,7 @@ public class JobProviderTests extends ESTestCase {
JobProvider provider = createProvider(client);
Integer[] holder = new Integer[1];
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new,
provider.expandBucket(jobId, false, bucket, null, records -> holder[0] = records, RuntimeException::new,
client);
int records = holder[0];
assertEquals(400L, records);