Made client calls in get buckets code non blocking.

Also merged the JobProvider#getBucket(...) method into Jobprovider#getBuckets(...) method, because
it contained a lot of similar logic, otherwise it had to be converted to use non blocking client calls too.

Part of elastic/elasticsearch#127

Original commit: elastic/x-pack-elasticsearch@b1e66b62cb
This commit is contained in:
Martijn van Groningen 2017-01-04 13:35:00 +01:00
parent 51e1199860
commit 41f2f51df6
10 changed files with 354 additions and 531 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.persistence.BucketQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
@ -410,33 +409,26 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
QueryPage<Bucket> results;
// Single bucket
if (request.timestamp != null) {
BucketQueryBuilder.BucketQuery query =
new BucketQueryBuilder(request.timestamp).expand(request.expand)
.includeInterim(request.includeInterim)
.partitionValue(request.partitionValue)
.build();
BucketsQueryBuilder query =
new BucketsQueryBuilder().expand(request.expand)
.includeInterim(request.includeInterim)
.start(request.start)
.end(request.end)
.anomalyScoreThreshold(request.anomalyScore)
.normalizedProbabilityThreshold(request.maxNormalizedProbability)
.partitionValue(request.partitionValue);
results = jobProvider.bucket(request.jobId, query);
} else {
// Multiple buckets
BucketsQueryBuilder.BucketsQuery query =
new BucketsQueryBuilder().expand(request.expand)
.includeInterim(request.includeInterim)
.start(request.start)
.end(request.end)
.from(request.pageParams.getFrom())
.size(request.pageParams.getSize())
.anomalyScoreThreshold(request.anomalyScore)
.normalizedProbabilityThreshold(request.maxNormalizedProbability)
.partitionValue(request.partitionValue)
.build();
results = jobProvider.buckets(request.jobId, query);
if (request.pageParams != null) {
query.from(request.pageParams.getFrom())
.size(request.pageParams.getSize());
}
listener.onResponse(new Response(results));
if (request.timestamp != null) {
query.timestamp(request.timestamp);
} else {
query.start(request.start);
query.end(request.end);
}
jobProvider.buckets(request.jobId, query.build(), q -> listener.onResponse(new Response(q)), listener::onFailure);
}
}

View File

@ -1,112 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.common.Strings;
import java.util.Objects;
/**
* One time query builder for a single buckets.
* <ul>
* <li>Timestamp (Required) - Timestamp of the bucket</li>
* <li>Expand- Include anomaly records. Default= false</li>
* <li>IncludeInterim- Include interim results. Default = false</li>
* <li>partitionValue Set the bucket's max normalized probabiltiy to this
* partiton field value's max normalized probability. Default = null</li>
* </ul>
*/
public final class BucketQueryBuilder {
public static int DEFAULT_SIZE = 100;
private BucketQuery bucketQuery;
public BucketQueryBuilder(String timestamp) {
bucketQuery = new BucketQuery(timestamp);
}
public BucketQueryBuilder expand(boolean expand) {
bucketQuery.expand = expand;
return this;
}
public BucketQueryBuilder includeInterim(boolean include) {
bucketQuery.includeInterim = include;
return this;
}
/**
* partitionValue must be non null and not empty else it
* is not set
*/
public BucketQueryBuilder partitionValue(String partitionValue) {
if (!Strings.isNullOrEmpty(partitionValue)) {
bucketQuery.partitionValue = partitionValue;
}
return this;
}
public BucketQueryBuilder.BucketQuery build() {
return bucketQuery;
}
public class BucketQuery {
private String timestamp;
private boolean expand = false;
private boolean includeInterim = false;
private String partitionValue = null;
public BucketQuery(String timestamp) {
this.timestamp = timestamp;
}
public String getTimestamp() {
return timestamp;
}
public boolean isIncludeInterim() {
return includeInterim;
}
public boolean isExpand() {
return expand;
}
/**
* @return Null if not set
*/
public String getPartitionValue() {
return partitionValue;
}
@Override
public int hashCode() {
return Objects.hash(timestamp, expand, includeInterim,
partitionValue);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
BucketQuery other = (BucketQuery) obj;
return Objects.equals(timestamp, other.timestamp) &&
Objects.equals(expand, other.expand) &&
Objects.equals(includeInterim, other.includeInterim) &&
Objects.equals(partitionValue, other.partitionValue);
}
}
}

View File

@ -108,7 +108,17 @@ public final class BucketsQueryBuilder {
return this;
}
public BucketsQueryBuilder timestamp(String timestamp) {
bucketsQuery.timestamp = timestamp;
bucketsQuery.size = 1;
return this;
}
public BucketsQueryBuilder.BucketsQuery build() {
if (bucketsQuery.timestamp != null && (bucketsQuery.start != null || bucketsQuery.end != null)) {
throw new IllegalStateException("Either specify timestamp or start/end");
}
return bucketsQuery;
}
@ -126,6 +136,7 @@ public final class BucketsQueryBuilder {
private double normalizedProbability = 0.0d;
private String start;
private String end;
private String timestamp;
private String partitionValue = null;
private String sortField = Bucket.TIMESTAMP.getPreferredName();
private boolean sortDescending = false;
@ -162,6 +173,10 @@ public final class BucketsQueryBuilder {
return end;
}
public String getTimestamp() {
return timestamp;
}
/**
* @return Null if not set
*/
@ -180,7 +195,7 @@ public final class BucketsQueryBuilder {
@Override
public int hashCode() {
return Objects.hash(from, size, expand, includeInterim, anomalyScoreFilter, normalizedProbability, start, end,
partitionValue, sortField, sortDescending);
timestamp, partitionValue, sortField, sortDescending);
}
@ -203,6 +218,7 @@ public final class BucketsQueryBuilder {
Objects.equals(includeInterim, other.includeInterim) &&
Objects.equals(start, other.start) &&
Objects.equals(end, other.end) &&
Objects.equals(timestamp, other.timestamp) &&
Objects.equals(anomalyScoreFilter, other.anomalyScoreFilter) &&
Objects.equals(normalizedProbability, other.normalizedProbability) &&
Objects.equals(partitionValue, other.partitionValue) &&

View File

@ -19,9 +19,10 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -86,6 +87,7 @@ import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class JobProvider {
private static final Logger LOGGER = Loggers.getLogger(JobProvider.class);
@ -332,6 +334,7 @@ public class JobProvider {
/**
* Get the job's data counts
*
* @param jobId The job id
* @return The dataCounts or default constructed object if not found
*/
@ -361,45 +364,114 @@ public class JobProvider {
/**
* Search for buckets with the parameters in the {@link BucketsQueryBuilder}
* @return QueryPage of Buckets
* @throws ResourceNotFoundException If the job id is no recognised
*/
public QueryPage<Bucket> buckets(String jobId, BucketsQuery query)
public void buckets(String jobId, BucketsQuery query, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler)
throws ResourceNotFoundException {
QueryBuilder fb = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
.score(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbability())
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim())
.build();
ResultsFilterBuilder rfb = new ResultsFilterBuilder();
if (query.getTimestamp() != null) {
rfb.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getTimestamp());
} else {
rfb.timeRange(Bucket.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
.score(Bucket.MAX_NORMALIZED_PROBABILITY.getPreferredName(), query.getNormalizedProbability())
.interim(Bucket.IS_INTERIM.getPreferredName(), query.isIncludeInterim());
}
SortBuilder<?> sortBuilder = new FieldSortBuilder(query.getSortField())
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
QueryPage<Bucket> buckets = buckets(jobId, query.isIncludeInterim(), query.getFrom(), query.getSize(), fb, sortBuilder);
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(rfb.build())
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(Result.TYPE.getPreferredName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sortBuilder);
searchSourceBuilder.query(boolQuery);
searchSourceBuilder.from(query.getFrom());
searchSourceBuilder.size(query.getSize());
searchRequest.source(searchSourceBuilder);
if (Strings.isNullOrEmpty(query.getPartitionValue())) {
for (Bucket b : buckets.results()) {
if (query.isExpand() && b.getRecordCount() > 0) {
expandBucket(jobId, query.isIncludeInterim(), b);
}
}
} else {
List<PerPartitionMaxProbabilities> scores =
partitionMaxNormalizedProbabilities(jobId, query.getStart(), query.getEnd(), query.getPartitionValue());
mergePartitionScoresIntoBucket(scores, buckets.results(), query.getPartitionValue());
for (Bucket b : buckets.results()) {
if (query.isExpand() && b.getRecordCount() > 0) {
this.expandBucketForPartitionValue(jobId, query.isIncludeInterim(), b, query.getPartitionValue());
}
b.setAnomalyScore(b.partitionAnomalyScore(query.getPartitionValue()));
}
MultiSearchRequest mrequest = new MultiSearchRequest();
mrequest.add(searchRequest);
if (Strings.hasLength(query.getPartitionValue())) {
mrequest.add(createPartitionMaxNormailizedProbabilitiesRequest(jobId, query.getStart(), query.getEnd(),
query.getPartitionValue()));
}
return buckets;
client.multiSearch(mrequest, ActionListener.wrap(mresponse -> {
MultiSearchResponse.Item item1 = mresponse.getResponses()[0];
if (item1.isFailure()) {
Exception e = item1.getFailure();
if (e instanceof IndexNotFoundException) {
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
} else {
errorHandler.accept(e);
}
return;
}
SearchResponse searchResponse = item1.getResponse();
SearchHits hits = searchResponse.getHits();
if (query.getTimestamp() != null) {
if (hits.getTotalHits() == 0) {
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
} else if (hits.getTotalHits() > 1) {
LOGGER.error("Found more than one bucket with timestamp [{}]" + " from index {}", query.getTimestamp(), indexName);
}
}
List<Bucket> results = new ArrayList<>();
for (SearchHit hit : hits.getHits()) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e);
}
Bucket bucket = Bucket.PARSER.apply(parser, () -> parseFieldMatcher);
if (query.isIncludeInterim() || bucket.isInterim() == false) {
results.add(bucket);
}
}
if (query.getTimestamp() != null && results.isEmpty()) {
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
}
QueryPage<Bucket> buckets = new QueryPage<>(results, searchResponse.getHits().getTotalHits(), Bucket.RESULTS_FIELD);
if (Strings.hasLength(query.getPartitionValue())) {
MultiSearchResponse.Item item2 = mresponse.getResponses()[1];
if (item2.isFailure()) {
Exception e = item2.getFailure();
if (e instanceof IndexNotFoundException) {
errorHandler.accept(ExceptionsHelper.missingJobException(jobId));
} else {
errorHandler.accept(e);
}
return;
}
List<PerPartitionMaxProbabilities> partitionProbs =
handlePartitionMaxNormailizedProbabilitiesResponse(item2.getResponse());
mergePartitionScoresIntoBucket(partitionProbs, buckets.results(), query.getPartitionValue());
for (Bucket b : buckets.results()) {
if (query.isExpand() && b.getRecordCount() > 0) {
this.expandBucketForPartitionValue(jobId, query.isIncludeInterim(), b, query.getPartitionValue());
}
b.setAnomalyScore(b.partitionAnomalyScore(query.getPartitionValue()));
}
} else {
for (Bucket b : buckets.results()) {
if (query.isExpand() && b.getRecordCount() > 0) {
expandBucket(jobId, query.isIncludeInterim(), b);
}
}
}
handler.accept(buckets);
}, errorHandler));
}
void mergePartitionScoresIntoBucket(List<PerPartitionMaxProbabilities> partitionProbs, List<Bucket> buckets, String partitionValue) {
@ -419,136 +491,8 @@ public class JobProvider {
}
}
private QueryPage<Bucket> buckets(String jobId, boolean includeInterim, int from, int size,
QueryBuilder fb, SortBuilder<?> sb) throws ResourceNotFoundException {
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(fb)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
SearchResponse searchResponse;
try {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of result type {} from index {} with filter from {} size {}",
Bucket.RESULT_TYPE_VALUE, indexName, from, size);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(Result.TYPE.getPreferredName());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.sort(sb);
searchSourceBuilder.query(new ConstantScoreQueryBuilder(boolQuery));
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
searchRequest.source(searchSourceBuilder);
searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
}
List<Bucket> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e);
}
Bucket bucket = Bucket.PARSER.apply(parser, () -> parseFieldMatcher);
if (includeInterim || bucket.isInterim() == false) {
results.add(bucket);
}
}
return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), Bucket.RESULTS_FIELD);
}
/**
* Get the bucket at time <code>timestampMillis</code> from the job.
*
* @param jobId the job id
* @param query The bucket query
* @return QueryPage Bucket
* @throws ResourceNotFoundException If the job id is not recognised
*/
public QueryPage<Bucket> bucket(String jobId, BucketQueryBuilder.BucketQuery query) throws ResourceNotFoundException {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
SearchHits hits;
try {
LOGGER.trace("ES API CALL: get Bucket with timestamp {} from index {}", query.getTimestamp(), indexName);
QueryBuilder matchQuery = QueryBuilders.matchQuery(Bucket.TIMESTAMP.getPreferredName(), query.getTimestamp());
QueryBuilder boolQuery = new BoolQueryBuilder()
.filter(matchQuery)
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(Result.TYPE.getPreferredName());
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(boolQuery);
sourceBuilder.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
hits = searchResponse.getHits();
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
}
if (hits.getTotalHits() == 0) {
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
} else if (hits.getTotalHits() > 1L) {
LOGGER.error("Found more than one bucket with timestamp [" + query.getTimestamp() + "]" +
" from index " + indexName);
}
SearchHit hit = hits.getAt(0);
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e);
}
Bucket bucket = Bucket.PARSER.apply(parser, () -> parseFieldMatcher);
// don't return interim buckets if not requested
if (bucket.isInterim() && query.isIncludeInterim() == false) {
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
}
if (Strings.isNullOrEmpty(query.getPartitionValue())) {
if (query.isExpand() && bucket.getRecordCount() > 0) {
expandBucket(jobId, query.isIncludeInterim(), bucket);
}
} else {
List<PerPartitionMaxProbabilities> partitionProbs =
partitionMaxNormalizedProbabilities(jobId, query.getTimestamp(), query.getTimestamp() + 1, query.getPartitionValue());
if (partitionProbs.size() > 1) {
LOGGER.error("Found more than one PerPartitionMaxProbabilities with timestamp [" + query.getTimestamp() + "]" +
" from index " + indexName);
}
if (partitionProbs.size() > 0) {
bucket.setMaxNormalizedProbability(partitionProbs.get(0).getMaxProbabilityForPartition(query.getPartitionValue()));
}
if (query.isExpand() && bucket.getRecordCount() > 0) {
this.expandBucketForPartitionValue(jobId, query.isIncludeInterim(),
bucket, query.getPartitionValue());
}
bucket.setAnomalyScore(
bucket.partitionAnomalyScore(query.getPartitionValue()));
}
return new QueryPage<>(Collections.singletonList(bucket), 1, Bucket.RESULTS_FIELD);
}
private List<PerPartitionMaxProbabilities> partitionMaxNormalizedProbabilities(String jobId, Object epochStart, Object epochEnd,
String partitionFieldValue)
throws ResourceNotFoundException {
private SearchRequest createPartitionMaxNormailizedProbabilitiesRequest(String jobId, Object epochStart, Object epochEnd,
String partitionFieldValue) {
QueryBuilder timeRangeQuery = new ResultsFilterBuilder()
.timeRange(Bucket.TIMESTAMP.getPreferredName(), epochStart, epochEnd)
.build();
@ -560,21 +504,16 @@ public class JobProvider {
FieldSortBuilder sb = new FieldSortBuilder(Bucket.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
SearchRequestBuilder searchBuilder = client
.prepareSearch(indexName)
.setQuery(boolQuery)
.addSort(sb)
.setTypes(Result.TYPE.getPreferredName());
SearchResponse searchResponse;
try {
searchResponse = searchBuilder.get();
} catch (IndexNotFoundException e) {
throw ExceptionsHelper.missingJobException(jobId);
}
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.sort(sb);
sourceBuilder.query(boolQuery);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(sourceBuilder);
return searchRequest;
}
private List<PerPartitionMaxProbabilities> handlePartitionMaxNormailizedProbabilitiesResponse(SearchResponse searchResponse) {
List<PerPartitionMaxProbabilities> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
@ -583,14 +522,13 @@ public class JobProvider {
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse PerPartitionMaxProbabilities", e);
}
results.add(PerPartitionMaxProbabilities.PARSER.apply(parser, () -> parseFieldMatcher));
}
return results;
}
public int expandBucketForPartitionValue(String jobId, boolean includeInterim, Bucket bucket,
// TODO (norelease): Use scroll search instead of multiple searches with increasing from
private int expandBucketForPartitionValue(String jobId, boolean includeInterim, Bucket bucket,
String partitionFieldValue) throws ResourceNotFoundException {
int from = 0;
@ -625,11 +563,12 @@ public class JobProvider {
/**
* Expand a bucket to include the associated records.
*
* @param jobId the job id
* @param jobId the job id
* @param includeInterim Include interim results
* @param bucket The bucket to be expanded
* @param bucket The bucket to be expanded
* @return The number of records added to the bucket
*/
// TODO (norelease): Use scroll search instead of multiple searches with increasing from
public int expandBucket(String jobId, boolean includeInterim, Bucket bucket) throws ResourceNotFoundException {
int from = 0;
@ -680,8 +619,8 @@ public class JobProvider {
* Get a page of {@linkplain CategoryDefinition}s for the given <code>jobId</code>.
*
* @param jobId the job id
* @param from Skip the first N categories. This parameter is for paging
* @param size Take only this number of categories
* @param from Skip the first N categories. This parameter is for paging
* @param size Take only this number of categories
* @return QueryPage of CategoryDefinition
*/
public QueryPage<CategoryDefinition> categoryDefinitions(String jobId, int from, int size) {
@ -719,7 +658,7 @@ public class JobProvider {
/**
* Get the specific CategoryDefinition for the given job and category id.
*
* @param jobId the job id
* @param jobId the job id
* @param categoryId Unique id
* @return QueryPage CategoryDefinition
*/
@ -756,6 +695,7 @@ public class JobProvider {
/**
* Search for anomaly records with the parameters in the
* {@link org.elasticsearch.xpack.prelert.job.persistence.RecordsQueryBuilder.RecordsQuery}
*
* @return QueryPage of AnomalyRecords
*/
public QueryPage<AnomalyRecord> records(String jobId, RecordsQueryBuilder.RecordsQuery query)
@ -815,7 +755,7 @@ public class JobProvider {
SearchResponse searchResponse;
try {
LOGGER.trace("ES API CALL: search all of result type {} from index {}{}{} with filter after sort from {} size {}",
AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "",
AnomalyRecord.RESULT_TYPE_VALUE, indexName, (sb != null) ? " with sort" : "",
secondarySort.isEmpty() ? "" : " with secondary sort", from, size);
searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest);
@ -843,10 +783,8 @@ public class JobProvider {
* Return a page of influencers for the given job and within the given date
* range
*
* @param jobId
* The job ID for which influencers are requested
* @param query
* the query
* @param jobId The job ID for which influencers are requested
* @param query the query
* @return QueryPage of Influencer
*/
public QueryPage<Influencer> influencers(String jobId, InfluencersQuery query) throws ResourceNotFoundException {
@ -905,7 +843,7 @@ public class JobProvider {
/**
* Get the influencer for the given job for id
*
* @param jobId the job id
* @param jobId the job id
* @param influencerId The unique influencer Id
* @return Optional Influencer
*/
@ -1022,7 +960,7 @@ public class JobProvider {
try {
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
ModelSnapshot.TYPE, indexName, sortField, from, size);
ModelSnapshot.TYPE, indexName, sortField, from, size);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(ModelSnapshot.TYPE.getPreferredName());
@ -1059,6 +997,7 @@ public class JobProvider {
* Given a model snapshot, get the corresponding state and write it to the supplied
* stream. If there are multiple state documents they are separated using <code>'\0'</code>
* when written to the stream.
*
* @param jobId the job id
* @param modelSnapshot the model snapshot to be restored
* @param restoreStream the stream to write the state to
@ -1233,6 +1172,6 @@ public class JobProvider {
* @return the {@code Auditor}
*/
public Auditor audit(String jobId) {
return new Auditor(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), jobId);
return new Auditor(client, AnomalyDetectorsIndex.jobResultsIndexName(jobId), jobId);
}
}

View File

@ -5,9 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
@ -15,20 +12,23 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import java.util.ArrayList;
import java.util.List;
/**
* This builder facilitates the creation of a {@link QueryBuilder} with common
* characteristics to both buckets and records.
*/
class ResultsFilterBuilder {
private final List<QueryBuilder> filters;
private final List<QueryBuilder> queries;
ResultsFilterBuilder() {
filters = new ArrayList<>();
queries = new ArrayList<>();
}
ResultsFilterBuilder(QueryBuilder filterBuilder) {
ResultsFilterBuilder(QueryBuilder queryBuilder) {
this();
filters.add(filterBuilder);
queries.add(queryBuilder);
}
ResultsFilterBuilder timeRange(String field, Object start, Object end) {
@ -40,16 +40,21 @@ class ResultsFilterBuilder {
if (end != null) {
timeRange.lt(end);
}
addFilter(timeRange);
addQuery(timeRange);
}
return this;
}
ResultsFilterBuilder timeRange(String field, String timestamp) {
addQuery(QueryBuilders.matchQuery(field, timestamp));
return this;
}
ResultsFilterBuilder score(String fieldName, double threshold) {
if (threshold > 0.0) {
RangeQueryBuilder scoreFilter = QueryBuilders.rangeQuery(fieldName);
scoreFilter.gte(threshold);
addFilter(scoreFilter);
addQuery(scoreFilter);
}
return this;
}
@ -64,12 +69,12 @@ class ResultsFilterBuilder {
// Implemented as "NOT isInterim == true" so that not present and null
// are equivalent to false. This improves backwards compatibility.
// Also, note how for a boolean field, unlike numeric term filters, the
// Also, note how for a boolean field, unlike numeric term queries, the
// term value is supplied as a string.
TermQueryBuilder interimFilter = QueryBuilders.termQuery(fieldName,
Boolean.TRUE.toString());
QueryBuilder notInterimFilter = QueryBuilders.boolQuery().mustNot(interimFilter);
addFilter(notInterimFilter);
addQuery(notInterimFilter);
return this;
}
@ -79,23 +84,23 @@ class ResultsFilterBuilder {
}
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(fieldName, fieldValue);
addFilter(termQueryBuilder);
addQuery(termQueryBuilder);
return this;
}
private void addFilter(QueryBuilder fb) {
filters.add(fb);
private void addQuery(QueryBuilder fb) {
queries.add(fb);
}
public QueryBuilder build() {
if (filters.isEmpty()) {
if (queries.isEmpty()) {
return QueryBuilders.matchAllQuery();
}
if (filters.size() == 1) {
return filters.get(0);
if (queries.size() == 1) {
return queries.get(0);
}
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
for (QueryBuilder query : filters) {
for (QueryBuilder query : queries) {
boolQueryBuilder.filter(query);
}
return boolQueryBuilder;

View File

@ -30,7 +30,6 @@ import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -72,43 +71,66 @@ public class ScheduledJobRunner extends AbstractComponent {
}
Scheduler scheduler = prelertMetadata.getScheduler(schedulerId);
logger.info("Starting scheduler [{}] for job [{}]", schedulerId, scheduler.getJobId());
Job job = prelertMetadata.getJobs().get(scheduler.getJobId());
Holder holder = createJobScheduler(scheduler, job, handler);
task.setHolder(holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.scheduledJob.runLookBack(startTime, endTime);
} catch (ScheduledJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (ScheduledJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + job.getId() + "]", e);
holder.stop(e);
return;
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
jobProvider.buckets(job.getId(), latestBucketQuery, buckets -> {
long latestFinalBucketEndMs = -1L;
Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan());
if (buckets.results().size() == 1) {
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1;
}
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
innerRun(scheduler, job, startTime, endTime, task, latestFinalBucketEndMs, handler);
}, e -> {
if (e instanceof ResourceNotFoundException) {
innerRun(scheduler, job, startTime, endTime, task, -1, handler);
} else {
holder.stop(null);
holder.problemTracker.finishReport();
handler.accept(e);
}
});
});
}
private void innerRun(Scheduler scheduler, Job job, long startTime, Long endTime, StartSchedulerAction.SchedulerTask task,
long latestFinalBucketEndMs, Consumer<Exception> handler) {
logger.info("Starting scheduler [{}] for job [{}]", scheduler.getId(), scheduler.getJobId());
Holder holder = createJobScheduler(scheduler, job, latestFinalBucketEndMs, handler);
task.setHolder(holder);
holder.future = threadPool.executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME).submit(() -> {
Long next = null;
try {
next = holder.scheduledJob.runLookBack(startTime, endTime);
} catch (ScheduledJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
} catch (ScheduledJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
} catch (ScheduledJob.EmptyDataCountException e) {
if (endTime == null && holder.problemTracker.updateEmptyDataCount(true) == false) {
next = e.nextDelayInMsSinceEpoch;
}
} catch (Exception e) {
logger.error("Failed lookback import for job [" + job.getId() + "]", e);
holder.stop(e);
return;
}
if (next != null) {
doScheduleRealtime(next, job.getId(), holder);
} else {
holder.stop(null);
holder.problemTracker.finishReport();
}
});
}
private void doScheduleRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) {
if (holder.isRunning()) {
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
@ -168,37 +190,16 @@ public class ScheduledJobRunner extends AbstractComponent {
ScheduledJobValidator.validate(scheduler.getConfig(), job);
}
private Holder createJobScheduler(Scheduler scheduler, Job job, Consumer<Exception> handler) {
private Holder createJobScheduler(Scheduler scheduler, Job job, long latestFinalBucketEndMs, Consumer<Exception> handler) {
Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(scheduler, job);
Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay());
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(scheduler.getConfig(), job);
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
dataExtractor, client, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job),
getLatestRecordTimestamp(job.getId()));
dataExtractor, client, auditor, currentTimeSupplier, latestFinalBucketEndMs, getLatestRecordTimestamp(job.getId()));
return new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler);
}
private long getLatestFinalBucketEndTimeMs(Job job) {
Duration bucketSpan = Duration.ofSeconds(job.getAnalysisConfig().getBucketSpan());
long latestFinalBucketEndMs = -1L;
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.includeInterim(false)
.build();
QueryPage<Bucket> buckets;
try {
buckets = jobProvider.buckets(job.getId(), latestBucketQuery);
if (buckets.results().size() == 1) {
latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.toMillis() - 1;
}
} catch (ResourceNotFoundException e) {
logger.error("Could not retrieve latest bucket timestamp", e);
}
return latestFinalBucketEndMs;
}
private long getLatestRecordTimestamp(String jobId) {
long latestRecordTimeMs = -1L;
DataCounts dataCounts = jobProvider.dataCounts(jobId);
@ -218,7 +219,7 @@ public class ScheduledJobRunner extends AbstractComponent {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}
private void setJobSchedulerStatus(String schedulerId, SchedulerStatus status, Consumer<Exception> supplier) {
private void setJobSchedulerStatus(String schedulerId, SchedulerStatus status, Consumer<Exception> handler) {
UpdateSchedulerStatusAction.Request request = new UpdateSchedulerStatusAction.Request(schedulerId, status);
client.execute(UpdateSchedulerStatusAction.INSTANCE, request, new ActionListener<UpdateSchedulerStatusAction.Response>() {
@Override
@ -228,13 +229,13 @@ public class ScheduledJobRunner extends AbstractComponent {
} else {
logger.info("set scheduler [{}] status to [{}], but was not acknowledged", schedulerId, status);
}
supplier.accept(null);
handler.accept(null);
}
@Override
public void onFailure(Exception e) {
logger.error("could not set scheduler [" + schedulerId + "] status to [" + status + "]", e);
supplier.accept(e);
handler.accept(e);
}
});
}

View File

@ -53,6 +53,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
private static final String JOB_ID = "foo";
@ -71,7 +73,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
jobProvider = new JobProvider(client(), 1, matcher);
}
public void testProcessResults() throws IOException {
public void testProcessResults() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor =
@ -113,7 +115,8 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build();
QueryPage<Bucket> persistedBucket = getBucketQueryPage(bucketsQuery);
assertEquals(1, persistedBucket.count());
// Records are not persisted to Elasticsearch as an array within the bucket
// documents, so remove them from the expected bucket before comparing
@ -149,7 +152,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertEquals(quantiles, persistedQuantiles.get());
}
public void testDeleteInterimResults() throws IOException, InterruptedException {
public void testDeleteInterimResults() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor =
@ -180,7 +183,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
// Records are not persisted to Elasticsearch as an array within the bucket
// documents, so remove them from the expected bucket before comparing
@ -194,7 +197,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertEquals(0, persistedRecords.count());
}
public void testMultipleFlushesBetweenPersisting() throws IOException, InterruptedException {
public void testMultipleFlushesBetweenPersisting() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor =
@ -229,7 +232,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
// Records are not persisted to Elasticsearch as an array within the bucket
// documents, so remove them from the expected bucket before comparing
@ -240,7 +243,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertResultsAreSame(finalAnomalyRecords, persistedRecords);
}
public void testEndOfStreamTriggersPersisting() throws IOException, InterruptedException {
public void testEndOfStreamTriggersPersisting() throws Exception {
createJob();
AutoDetectResultProcessor resultProcessor =
@ -270,7 +273,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID,
@ -447,4 +450,22 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
expectedSet.removeAll(actual.results());
assertEquals(0, expectedSet.size());
}
private QueryPage<Bucket> getBucketQueryPage(BucketsQueryBuilder.BucketsQuery bucketsQuery) throws Exception {
AtomicReference<Exception> errorHolder = new AtomicReference<>();
AtomicReference<QueryPage<Bucket>> resultHolder = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
jobProvider.buckets(JOB_ID, bucketsQuery, r -> {
resultHolder.set(r);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
latch.await();
if (errorHolder.get() != null) {
throw errorHolder.get();
}
return resultHolder.get();
}
}

View File

@ -1,64 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.test.ESTestCase;
import org.junit.Assert;
public class BucketQueryBuilderTests extends ESTestCase {
public void testDefaultBuild() throws Exception {
BucketQueryBuilder.BucketQuery query = new BucketQueryBuilder("1000").build();
Assert.assertEquals("1000", query.getTimestamp());
assertEquals(false, query.isIncludeInterim());
assertEquals(false, query.isExpand());
assertEquals(null, query.getPartitionValue());
}
public void testDefaultAll() throws Exception {
BucketQueryBuilder.BucketQuery query =
new BucketQueryBuilder("1000")
.expand(true)
.includeInterim(true)
.partitionValue("p")
.build();
Assert.assertEquals("1000", query.getTimestamp());
assertEquals(true, query.isIncludeInterim());
assertEquals(true, query.isExpand());
assertEquals("p", query.getPartitionValue());
}
public void testEqualsHash() throws Exception {
BucketQueryBuilder.BucketQuery query =
new BucketQueryBuilder("1000")
.expand(true)
.includeInterim(true)
.partitionValue("p")
.build();
BucketQueryBuilder.BucketQuery query2 =
new BucketQueryBuilder("1000")
.expand(true)
.includeInterim(true)
.partitionValue("p")
.build();
assertEquals(query2, query);
assertEquals(query2.hashCode(), query.hashCode());
query2 =
new BucketQueryBuilder("1000")
.expand(true)
.includeInterim(true)
.partitionValue("q")
.build();
assertFalse(query2.equals(query));
assertFalse(query2.hashCode() == query.hashCode());
}
}

View File

@ -10,6 +10,8 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher;
@ -51,6 +53,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.prelert.job.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -63,12 +66,12 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public class JobProviderTests extends ESTestCase {
private static final String CLUSTER_NAME = "myCluster";
private static final String JOB_ID = "foo";
private static final String STATE_INDEX_NAME = ".ml-state";
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testGetQuantiles_GivenNoQuantilesForJob() throws Exception {
GetResponse getResponse = createGetResponse(false, null);
@ -82,6 +85,7 @@ public class JobProviderTests extends ESTestCase {
assertFalse(quantiles.isPresent());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testGetQuantiles_GivenQuantilesHaveNonEmptyState() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put(Job.ID.getPreferredName(), "foo");
@ -100,6 +104,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals("state", quantiles.get().getQuantileState());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testGetQuantiles_GivenQuantilesHaveEmptyState() throws Exception {
Map<String, Object> source = new HashMap<>();
source.put(Job.ID.getPreferredName(), "foo");
@ -330,23 +335,22 @@ public class JobProviderTests extends ESTestCase {
map.put("bucket_span", 22);
source.add(map);
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(true, source);
int from = 0;
int size = 10;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(queryBuilder -> {queryBuilderHolder[0] = queryBuilder;}, response);
JobProvider provider = createProvider(client);
BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(0.0)
.normalizedProbabilityThreshold(1.0);
QueryPage<Bucket> buckets = provider.buckets(jobId, bq.build());
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<Bucket>[] holder = new QueryPage[1];
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);});
QueryPage<Bucket> buckets = holder[0];
assertEquals(1L, buckets.count());
QueryBuilder query = queryBuilder.getValue();
QueryBuilder query = queryBuilderHolder[0];
String queryString = query.toString();
assertTrue(
queryString.matches("(?s).*max_normalized_probability[^}]*from. : 1\\.0.*must_not[^}]*term[^}]*is_interim.*value. : .true" +
@ -365,23 +369,23 @@ public class JobProviderTests extends ESTestCase {
map.put("bucket_span", 22);
source.add(map);
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(true, source);
int from = 99;
int size = 17;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response);
JobProvider provider = createProvider(client);
BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(5.1)
.normalizedProbabilityThreshold(10.9).includeInterim(true);
QueryPage<Bucket> buckets = provider.buckets(jobId, bq.build());
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<Bucket>[] holder = new QueryPage[1];
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);});
QueryPage<Bucket> buckets = holder[0];
assertEquals(1L, buckets.count());
QueryBuilder query = queryBuilder.getValue();
QueryBuilder query = queryBuilderHolder[0];
String queryString = query.toString();
assertTrue(queryString.matches("(?s).*max_normalized_probability[^}]*from. : 10\\.9.*"));
assertTrue(queryString.matches("(?s).*anomaly_score[^}]*from. : 5\\.1.*"));
@ -400,15 +404,12 @@ public class JobProviderTests extends ESTestCase {
map.put("bucket_span", 22);
source.add(map);
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
QueryBuilder[] queryBuilderHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(true, source);
int from = 99;
int size = 17;
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response);
JobProvider provider = createProvider(client);
BucketsQueryBuilder bq = new BucketsQueryBuilder();
@ -418,9 +419,12 @@ public class JobProviderTests extends ESTestCase {
bq.normalizedProbabilityThreshold(10.9);
bq.includeInterim(true);
QueryPage<Bucket> buckets = provider.buckets(jobId, bq.build());
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<Bucket>[] holder = new QueryPage[1];
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);});
QueryPage<Bucket> buckets = holder[0];
assertEquals(1L, buckets.count());
QueryBuilder query = queryBuilder.getValue();
QueryBuilder query = queryBuilderHolder[0];
String queryString = query.toString();
assertTrue(queryString.matches("(?s).*max_normalized_probability[^}]*from. : 10\\.9.*"));
assertTrue(queryString.matches("(?s).*anomaly_score[^}]*from. : 5\\.1.*"));
@ -431,22 +435,18 @@ public class JobProviderTests extends ESTestCase {
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
Long timestamp = 98765432123456789L;
Date now = new Date();
List<Map<String, Object>> source = new ArrayList<>();
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(false, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(queryBuilder -> {}, response);
JobProvider provider = createProvider(client);
BucketQueryBuilder bq = new BucketQueryBuilder(Long.toString(timestamp));
expectThrows(ResourceNotFoundException.class,
() -> provider.bucket(jobId, bq.build()));
BucketsQueryBuilder bq = new BucketsQueryBuilder();
bq.timestamp(Long.toString(timestamp));
Exception[] holder = new Exception[1];
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;});
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
}
public void testBucket_OneBucketNoExpandNoInterim()
@ -461,20 +461,18 @@ public class JobProviderTests extends ESTestCase {
map.put("bucket_span", 22);
source.add(map);
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(queryBuilder -> {}, response);
JobProvider provider = createProvider(client);
BucketQueryBuilder bq = new BucketQueryBuilder(Long.toString(now.getTime()));
BucketsQueryBuilder bq = new BucketsQueryBuilder();
bq.timestamp(Long.toString(now.getTime()));
QueryPage<Bucket> bucketHolder = provider.bucket(jobId, bq.build());
assertThat(bucketHolder.count(), equalTo(1L));
Bucket b = bucketHolder.results().get(0);
@SuppressWarnings({"unchecked", "rawtypes"})
QueryPage<Bucket>[] bucketHolder = new QueryPage[1];
provider.buckets(jobId, bq.build(), q -> {bucketHolder[0] = q;}, e -> {});
assertThat(bucketHolder[0].count(), equalTo(1L));
Bucket b = bucketHolder[0].results().get(0);
assertEquals(now, b.getTimestamp());
}
@ -491,21 +489,19 @@ public class JobProviderTests extends ESTestCase {
map.put("is_interim", true);
source.add(map);
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
Result.TYPE.getPreferredName(), 0, 0, response, queryBuilder);
Client client = clientBuilder.build();
Client client = getMockedClient(queryBuilder -> {}, response);
JobProvider provider = createProvider(client);
BucketQueryBuilder bq = new BucketQueryBuilder(Long.toString(now.getTime()));
BucketsQueryBuilder bq = new BucketsQueryBuilder();
bq.timestamp(Long.toString(now.getTime()));
expectThrows(ResourceNotFoundException.class,
() -> provider.bucket(jobId, bq.build()));
Exception[] holder = new Exception[1];
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;});
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testRecords() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
Date now = new Date();
@ -557,6 +553,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals("irrascible", records.get(1).getFunction());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testRecords_UsingBuilder()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
@ -615,6 +612,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals("irrascible", records.get(1).getFunction());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testBucketRecords() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
Date now = new Date();
@ -665,6 +663,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals("irrascible", records.get(1).getFunction());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testexpandBucket() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
Date now = new Date();
@ -696,6 +695,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(400L, records);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testexpandBucket_WithManyRecords()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
@ -731,6 +731,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(1200L, records);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testCategoryDefinitions()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
@ -759,6 +760,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(terms, categoryDefinitions.results().get(0).getTerms());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testCategoryDefinition()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentification";
@ -783,6 +785,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(terms, categoryDefinitions.results().get(0).getTerms());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testInfluencers_NoInterim()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
@ -847,6 +850,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(5.0, records.get(1).getInitialAnomalyScore(), 0.00001);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testInfluencers_WithInterim()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
@ -911,6 +915,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(5.0, records.get(1).getInitialAnomalyScore(), 0.00001);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testInfluencer() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
String influencerId = "ThisIsAnInfluencerId";
@ -927,6 +932,7 @@ public class JobProviderTests extends ESTestCase {
}
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testModelSnapshots() throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
Date now = new Date();
@ -982,6 +988,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(6, snapshots.get(1).getSnapshotDocCount());
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testModelSnapshots_WithDescription()
throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers";
@ -1115,6 +1122,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(0.0, buckets.get(3).getMaxNormalizedProbability(), 0.001);
}
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testRestoreStateToStream() throws Exception {
Map<String, Object> categorizerState = new HashMap<>();
categorizerState.put("catName", "catVal");
@ -1195,4 +1203,19 @@ public class JobProviderTests extends ESTestCase {
return response;
}
private Client getMockedClient(Consumer<QueryBuilder> queryBuilderConsumer, SearchResponse response) {
Client client = mock(Client.class);
doAnswer(invocationOnMock -> {
MultiSearchRequest multiSearchRequest = (MultiSearchRequest) invocationOnMock.getArguments()[0];
queryBuilderConsumer.accept(multiSearchRequest.requests().get(0).source().query());
@SuppressWarnings("unchecked")
ActionListener<MultiSearchResponse> actionListener = (ActionListener<MultiSearchResponse>) invocationOnMock.getArguments()[1];
MultiSearchResponse mresponse =
new MultiSearchResponse(new MultiSearchResponse.Item[]{new MultiSearchResponse.Item(response, null)});
actionListener.onResponse(mresponse);
return null;
}).when(client).multiSearch(any(), any());
return client;
}
}

View File

@ -33,9 +33,7 @@ import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.junit.Before;
import java.io.ByteArrayInputStream;
@ -104,8 +102,12 @@ public class ScheduledJobRunnerTests extends ESTestCase {
() -> currentTime);
when(jobProvider.audit(anyString())).thenReturn(auditor);
when(jobProvider.buckets(anyString(), any(BucketsQueryBuilder.BucketsQuery.class))).thenThrow(
QueryPage.emptyQueryPage(Job.RESULTS_FIELD));
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
Consumer consumer = (Consumer) invocationOnMock.getArguments()[3];
consumer.accept(new ResourceNotFoundException("dummy"));
return null;
}).when(jobProvider).buckets(any(), any(), any(), any());
}
public void testStart_GivenNewlyCreatedJobLoopBack() throws Exception {