[ML] Remove recursion in bucket expansion (elastic/x-pack-elasticsearch#832)
Relates elastic/x-pack-elasticsearch#752 Original commit: elastic/x-pack-elasticsearch@85392296a5
This commit is contained in:
parent
28d6b505ed
commit
73434dadeb
|
@ -107,7 +107,7 @@ public class JobProvider {
|
|||
AnomalyRecord.FUNCTION.getPreferredName()
|
||||
);
|
||||
|
||||
private static final int RECORDS_SIZE_PARAM = 500;
|
||||
private static final int RECORDS_SIZE_PARAM = 10000;
|
||||
|
||||
private final Client client;
|
||||
private final Settings settings;
|
||||
|
@ -492,7 +492,12 @@ public class JobProvider {
|
|||
/**
|
||||
* Expand a bucket with its records
|
||||
*/
|
||||
// TODO (norelease): Use scroll search instead of multiple searches with increasing from
|
||||
// TODO: Ensure all records are included in expanded bucket (see x-pack-elasticsearch#833)
|
||||
// This now gets the first 10K records for a bucket. The rate of records per bucket
|
||||
// is controlled by parameter in the c++ process and its default value is 500. Users may
|
||||
// change that. If they change it they could arguably also change the soft limit for the
|
||||
// search size. However, ideally we should change this to use async scroll if the total
|
||||
// records is more than 10K.
|
||||
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue, int from,
|
||||
Consumer<Integer> consumer, Consumer<Exception> errorHandler, Client client) {
|
||||
Consumer<QueryPage<AnomalyRecord>> h = page -> {
|
||||
|
@ -500,12 +505,7 @@ public class JobProvider {
|
|||
if (partitionFieldValue != null) {
|
||||
bucket.setAnomalyScore(bucket.partitionAnomalyScore(partitionFieldValue));
|
||||
}
|
||||
if (page.count() > from + RECORDS_SIZE_PARAM) {
|
||||
expandBucket(jobId, includeInterim, bucket, partitionFieldValue, from + RECORDS_SIZE_PARAM, consumer, errorHandler,
|
||||
client);
|
||||
} else {
|
||||
consumer.accept(bucket.getRecords().size());
|
||||
}
|
||||
consumer.accept(bucket.getRecords().size());
|
||||
};
|
||||
bucketRecords(jobId, bucket, from, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(),
|
||||
false, partitionFieldValue, h, errorHandler, client);
|
||||
|
|
|
@ -291,7 +291,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(map);
|
||||
|
||||
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 0;
|
||||
int size = 10;
|
||||
Client client = getMockedClient(queryBuilder -> {queryBuilderHolder[0] = queryBuilder;}, response);
|
||||
|
@ -324,7 +324,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(map);
|
||||
|
||||
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 99;
|
||||
int size = 17;
|
||||
|
||||
|
@ -358,7 +358,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(map);
|
||||
|
||||
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 99;
|
||||
int size = 17;
|
||||
|
||||
|
@ -388,7 +388,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
Long timestamp = 98765432123456789L;
|
||||
List<Map<String, Object>> source = new ArrayList<>();
|
||||
|
||||
SearchResponse response = createSearchResponse(false, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
|
||||
Client client = getMockedClient(queryBuilder -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
@ -412,7 +412,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
map.put("bucket_span", 22);
|
||||
source.add(map);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(queryBuilder -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -440,7 +440,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
map.put("is_interim", true);
|
||||
source.add(map);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(queryBuilder -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -479,7 +479,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 14;
|
||||
int size = 2;
|
||||
String sortfield = "minefield";
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -529,7 +529,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 14;
|
||||
int size = 2;
|
||||
String sortfield = "minefield";
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
@ -586,7 +586,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 14;
|
||||
int size = 2;
|
||||
String sortfield = "minefield";
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -624,7 +624,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.add(recordMap);
|
||||
}
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -635,39 +635,6 @@ public class JobProviderTests extends ESTestCase {
|
|||
assertEquals(400L, records);
|
||||
}
|
||||
|
||||
public void testexpandBucket_WithManyRecords()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
String jobId = "TestJobIdentification";
|
||||
Date now = new Date();
|
||||
Bucket bucket = new Bucket("foo", now, 22);
|
||||
|
||||
List<Map<String, Object>> source = new ArrayList<>();
|
||||
for (int i = 0; i < 600; i++) {
|
||||
Map<String, Object> recordMap = new HashMap<>();
|
||||
recordMap.put("job_id", "foo");
|
||||
recordMap.put("typical", 22.4 + i);
|
||||
recordMap.put("actual", 33.3 + i);
|
||||
recordMap.put("timestamp", now.getTime());
|
||||
recordMap.put("function", "irritable");
|
||||
recordMap.put("bucket_span", 22);
|
||||
recordMap.put("sequence_num", i + 1);
|
||||
source.add(recordMap);
|
||||
}
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
Integer[] holder = new Integer[1];
|
||||
provider.expandBucket(jobId, false, bucket, null, 0, records -> holder[0] = records, RuntimeException::new,
|
||||
client);
|
||||
int records = holder[0];
|
||||
|
||||
// This is not realistic, but is an artifact of the fact that the mock
|
||||
// query returns all the records, not a subset
|
||||
assertEquals(1200L, records);
|
||||
}
|
||||
|
||||
public void testCategoryDefinitions()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
String jobId = "TestJobIdentification";
|
||||
|
@ -681,7 +648,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
source.add(map);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
int from = 0;
|
||||
int size = 10;
|
||||
Client client = getMockedClient(q -> {}, response);
|
||||
|
@ -707,7 +674,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
source.put("category_id", categoryId);
|
||||
source.put("terms", terms);
|
||||
|
||||
SearchResponse response = createSearchResponse(true, Collections.singletonList(source));
|
||||
SearchResponse response = createSearchResponse(Collections.singletonList(source));
|
||||
Client client = getMockedClient(q -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
|
@ -750,7 +717,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 4;
|
||||
int size = 3;
|
||||
QueryBuilder[] qbHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(q -> qbHolder[0] = q, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -812,7 +779,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 4;
|
||||
int size = 3;
|
||||
QueryBuilder[] qbHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(q -> qbHolder[0] = q, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -867,7 +834,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
|
||||
int from = 4;
|
||||
int size = 3;
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -918,7 +885,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
int from = 4;
|
||||
int size = 3;
|
||||
QueryBuilder[] qbHolder = new QueryBuilder[1];
|
||||
SearchResponse response = createSearchResponse(true, source);
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(qb -> qbHolder[0] = qb, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
|
@ -1060,7 +1027,8 @@ public class JobProviderTests extends ESTestCase {
|
|||
return getResponse;
|
||||
}
|
||||
|
||||
private static SearchResponse createSearchResponse(boolean exists, List<Map<String, Object>> source) throws IOException {
|
||||
private static SearchResponse createSearchResponse(List<Map<String, Object>> source)
|
||||
throws IOException {
|
||||
SearchResponse response = mock(SearchResponse.class);
|
||||
List<SearchHit> list = new ArrayList<>();
|
||||
|
||||
|
|
Loading…
Reference in New Issue