diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 2b1f06b0739..fa71625ede0 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -458,14 +458,14 @@ public class JobProvider { if (query.isExpand()) { Iterator bucketsToExpand = buckets.results().stream() .filter(bucket -> bucket.getRecordCount() > 0).iterator(); - expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler, client); + expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); return; } } else { if (query.isExpand()) { Iterator bucketsToExpand = buckets.results().stream() .filter(bucket -> bucket.getRecordCount() > 0).iterator(); - expandBuckets(jobId, query, buckets, bucketsToExpand, 0, handler, errorHandler, client); + expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); return; } } @@ -474,12 +474,12 @@ public class JobProvider { } private void expandBuckets(String jobId, BucketsQuery query, QueryPage buckets, Iterator bucketsToExpand, - int from, Consumer> handler, Consumer errorHandler, Client client) { + Consumer> handler, Consumer errorHandler, Client client) { if (bucketsToExpand.hasNext()) { Consumer c = i -> { - expandBuckets(jobId, query, buckets, bucketsToExpand, from + RECORDS_SIZE_PARAM, handler, errorHandler, client); + expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client); }; - expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), from, c, errorHandler, client); + expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), c, errorHandler, client); } else { handler.accept(buckets); } @@ -548,13 +548,10 @@ public class JobProvider { /** * Expand a bucket with its records */ - // TODO: Ensure all records are included in expanded bucket (see x-pack-elasticsearch#833) // This now gets the first 10K records for a bucket. The rate of records per bucket // is controlled by parameter in the c++ process and its default value is 500. Users may - // change that. If they change it they could arguably also change the soft limit for the - // search size. However, ideally we should change this to use async scroll if the total - // records is more than 10K. - public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from, + // change that. Issue elastic/machine-learning-cpp#73 is open to prevent this. + public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, Consumer consumer, Consumer errorHandler, Client client) { Consumer> h = page -> { bucket.getRecords().addAll(page.results()); @@ -563,7 +560,7 @@ public class JobProvider { } consumer.accept(bucket.getRecords().size()); }; - bucketRecords(jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(), + bucketRecords(jobId, bucket, 0, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(), false, partitionFieldValue, h, errorHandler, client); } @@ -718,7 +715,7 @@ public class JobProvider { searchRequest.source().sort(sortField, descending ? SortOrder.DESC : SortOrder.ASC); } - LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}", + LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}", AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "", secondarySort.isEmpty() ? "" : " with secondary sort", from, size); client.search(searchRequest, ActionListener.wrap(searchResponse -> { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index 2aa5c7d65f6..19bede0cd23 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -581,7 +581,7 @@ public class JobProviderTests extends ESTestCase { JobProvider provider = createProvider(client); Integer[] holder = new Integer[1]; - provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new, + provider.expandBucket(jobId, false, bucket, null, records -> holder[0] = records, RuntimeException::new, client); int records = holder[0]; assertEquals(400L, records);