mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
[ML] Refactor [Bucket|Record]QueryBuilder classes (elastic/x-pack-elasticsearch#2684)
Those classes used to be elasticsearch-agnostic wrappers of the query parameters. However, we now do not need that layer of abstraction. Instead we can make those builders own the building of the SearchSourceBuilder, which simplifies the JobProvider and makes them reusable. Original commit: elastic/x-pack-elasticsearch@b079cce1d6
This commit is contained in:
parent
45c62cca63
commit
686eb0ab65
@ -410,7 +410,7 @@ public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucket
|
||||
query.start(request.start);
|
||||
query.end(request.end);
|
||||
}
|
||||
jobProvider.buckets(request.jobId, query.build(), q -> listener.onResponse(new Response(q)), listener::onFailure, client);
|
||||
jobProvider.buckets(request.jobId, query, q -> listener.onResponse(new Response(q)), listener::onFailure, client);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -323,7 +323,7 @@ public class GetRecordsAction extends Action<GetRecordsAction.Request, GetRecord
|
||||
|
||||
jobManager.getJobOrThrowIfUnknown(request.getJobId());
|
||||
|
||||
RecordsQueryBuilder.RecordsQuery query = new RecordsQueryBuilder()
|
||||
RecordsQueryBuilder query = new RecordsQueryBuilder()
|
||||
.includeInterim(request.excludeInterim == false)
|
||||
.epochStart(request.start)
|
||||
.epochEnd(request.end)
|
||||
@ -331,8 +331,7 @@ public class GetRecordsAction extends Action<GetRecordsAction.Request, GetRecord
|
||||
.size(request.pageParams.getSize())
|
||||
.recordScore(request.recordScoreFilter)
|
||||
.sortField(request.sort)
|
||||
.sortDescending(request.descending)
|
||||
.build();
|
||||
.sortDescending(request.descending);
|
||||
jobProvider.records(request.jobId, query, page -> listener.onResponse(new Response(page)), listener::onFailure, client);
|
||||
}
|
||||
}
|
||||
|
@ -83,11 +83,10 @@ public class DatafeedJobBuilder {
|
||||
};
|
||||
|
||||
// Step 1. Collect latest bucket
|
||||
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
|
||||
BucketsQueryBuilder latestBucketQuery = new BucketsQueryBuilder()
|
||||
.sortField(Result.TIMESTAMP.getPreferredName())
|
||||
.sortDescending(true).size(1)
|
||||
.includeInterim(false)
|
||||
.build();
|
||||
.includeInterim(false);
|
||||
jobProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
QueryPage<Bucket> empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD);
|
||||
|
@ -5,11 +5,16 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.FieldSortBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.xpack.ml.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* One time query builder for buckets.
|
||||
* <ul>
|
||||
@ -34,52 +39,59 @@ import java.util.Objects;
|
||||
public final class BucketsQueryBuilder {
|
||||
public static final int DEFAULT_SIZE = 100;
|
||||
|
||||
private BucketsQuery bucketsQuery = new BucketsQuery();
|
||||
private int from = 0;
|
||||
private int size = DEFAULT_SIZE;
|
||||
private boolean expand = false;
|
||||
private boolean includeInterim = false;
|
||||
private double anomalyScoreFilter = 0.0;
|
||||
private String start;
|
||||
private String end;
|
||||
private String timestamp;
|
||||
private String sortField = Result.TIMESTAMP.getPreferredName();
|
||||
private boolean sortDescending = false;
|
||||
|
||||
public BucketsQueryBuilder from(int from) {
|
||||
bucketsQuery.from = from;
|
||||
this.from = from;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder size(int size) {
|
||||
bucketsQuery.size = size;
|
||||
this.size = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder expand(boolean expand) {
|
||||
bucketsQuery.expand = expand;
|
||||
this.expand = expand;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isExpand() {
|
||||
return expand;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder includeInterim(boolean include) {
|
||||
bucketsQuery.includeInterim = include;
|
||||
this.includeInterim = include;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isIncludeInterim() {
|
||||
return includeInterim;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder anomalyScoreThreshold(Double anomalyScoreFilter) {
|
||||
if (anomalyScoreFilter != null) {
|
||||
bucketsQuery.anomalyScoreFilter = anomalyScoreFilter;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param partitionValue Not set if null or empty
|
||||
*/
|
||||
public BucketsQueryBuilder partitionValue(String partitionValue) {
|
||||
if (!Strings.isNullOrEmpty(partitionValue)) {
|
||||
bucketsQuery.partitionValue = partitionValue;
|
||||
this.anomalyScoreFilter = anomalyScoreFilter;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder sortField(String sortField) {
|
||||
bucketsQuery.sortField = sortField;
|
||||
this.sortField = sortField;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder sortDescending(boolean sortDescending) {
|
||||
bucketsQuery.sortDescending = sortDescending;
|
||||
this.sortDescending = sortDescending;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -87,7 +99,7 @@ public final class BucketsQueryBuilder {
|
||||
* If startTime <= 0 the parameter is not set
|
||||
*/
|
||||
public BucketsQueryBuilder start(String startTime) {
|
||||
bucketsQuery.start = startTime;
|
||||
this.start = startTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -95,121 +107,52 @@ public final class BucketsQueryBuilder {
|
||||
* If endTime <= 0 the parameter is not set
|
||||
*/
|
||||
public BucketsQueryBuilder end(String endTime) {
|
||||
bucketsQuery.end = endTime;
|
||||
this.end = endTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder timestamp(String timestamp) {
|
||||
bucketsQuery.timestamp = timestamp;
|
||||
bucketsQuery.size = 1;
|
||||
this.timestamp = timestamp;
|
||||
this.size = 1;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BucketsQueryBuilder.BucketsQuery build() {
|
||||
if (bucketsQuery.timestamp != null && (bucketsQuery.start != null || bucketsQuery.end != null)) {
|
||||
public boolean hasTimestamp() {
|
||||
return timestamp != null;
|
||||
}
|
||||
|
||||
public SearchSourceBuilder build() {
|
||||
if (timestamp != null && (start != null || end != null)) {
|
||||
throw new IllegalStateException("Either specify timestamp or start/end");
|
||||
}
|
||||
|
||||
return bucketsQuery;
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
bucketsQuery = new BucketsQueryBuilder.BucketsQuery();
|
||||
}
|
||||
|
||||
|
||||
public class BucketsQuery {
|
||||
private int from = 0;
|
||||
private int size = DEFAULT_SIZE;
|
||||
private boolean expand = false;
|
||||
private boolean includeInterim = false;
|
||||
private double anomalyScoreFilter = 0.0;
|
||||
private String start;
|
||||
private String end;
|
||||
private String timestamp;
|
||||
private String partitionValue = null;
|
||||
private String sortField = Result.TIMESTAMP.getPreferredName();
|
||||
private boolean sortDescending = false;
|
||||
|
||||
public int getFrom() {
|
||||
return from;
|
||||
ResultsFilterBuilder rfb = new ResultsFilterBuilder();
|
||||
if (hasTimestamp()) {
|
||||
rfb.timeRange(Result.TIMESTAMP.getPreferredName(), timestamp);
|
||||
} else {
|
||||
rfb.timeRange(Result.TIMESTAMP.getPreferredName(), start, end)
|
||||
.score(Bucket.ANOMALY_SCORE.getPreferredName(), anomalyScoreFilter)
|
||||
.interim(includeInterim);
|
||||
}
|
||||
|
||||
public int getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public boolean isExpand() {
|
||||
return expand;
|
||||
}
|
||||
|
||||
public boolean isIncludeInterim() {
|
||||
return includeInterim;
|
||||
}
|
||||
|
||||
public double getAnomalyScoreFilter() {
|
||||
return anomalyScoreFilter;
|
||||
}
|
||||
|
||||
public String getStart() {
|
||||
return start;
|
||||
}
|
||||
|
||||
public String getEnd() {
|
||||
return end;
|
||||
}
|
||||
|
||||
public String getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Null if not set
|
||||
*/
|
||||
public String getPartitionValue() {
|
||||
return partitionValue;
|
||||
}
|
||||
|
||||
public String getSortField() {
|
||||
return sortField;
|
||||
}
|
||||
|
||||
public boolean isSortDescending() {
|
||||
return sortDescending;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(from, size, expand, includeInterim, anomalyScoreFilter, start, end,
|
||||
timestamp, partitionValue, sortField, sortDescending);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BucketsQuery other = (BucketsQuery) obj;
|
||||
return Objects.equals(from, other.from) &&
|
||||
Objects.equals(size, other.size) &&
|
||||
Objects.equals(expand, other.expand) &&
|
||||
Objects.equals(includeInterim, other.includeInterim) &&
|
||||
Objects.equals(start, other.start) &&
|
||||
Objects.equals(end, other.end) &&
|
||||
Objects.equals(timestamp, other.timestamp) &&
|
||||
Objects.equals(anomalyScoreFilter, other.anomalyScoreFilter) &&
|
||||
Objects.equals(partitionValue, other.partitionValue) &&
|
||||
Objects.equals(sortField, other.sortField) &&
|
||||
this.sortDescending == other.sortDescending;
|
||||
SortBuilder<?> sortBuilder = new FieldSortBuilder(sortField)
|
||||
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
||||
|
||||
QueryBuilder boolQuery = new BoolQueryBuilder()
|
||||
.filter(rfb.build())
|
||||
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
|
||||
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.sort(sortBuilder);
|
||||
searchSourceBuilder.query(boolQuery);
|
||||
searchSourceBuilder.from(from);
|
||||
searchSourceBuilder.size(size);
|
||||
|
||||
// If not using the default sort field (timestamp) add it as a secondary sort
|
||||
if (Result.TIMESTAMP.getPreferredName().equals(sortField) == false) {
|
||||
searchSourceBuilder.sort(Result.TIMESTAMP.getPreferredName(), sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
|
||||
return searchSourceBuilder;
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,6 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.FieldSortBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.xpack.ml.MlMetaIndex;
|
||||
@ -60,7 +59,6 @@ import org.elasticsearch.xpack.ml.action.GetRecordsAction;
|
||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.job.config.MlFilter;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery;
|
||||
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
|
||||
@ -93,15 +91,6 @@ import java.util.function.Supplier;
|
||||
public class JobProvider {
|
||||
private static final Logger LOGGER = Loggers.getLogger(JobProvider.class);
|
||||
|
||||
private static final List<String> SECONDARY_SORT = Arrays.asList(
|
||||
AnomalyRecord.RECORD_SCORE.getPreferredName(),
|
||||
AnomalyRecord.OVER_FIELD_VALUE.getPreferredName(),
|
||||
AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(),
|
||||
AnomalyRecord.BY_FIELD_VALUE.getPreferredName(),
|
||||
AnomalyRecord.FIELD_NAME.getPreferredName(),
|
||||
AnomalyRecord.FUNCTION.getPreferredName()
|
||||
);
|
||||
|
||||
private static final int RECORDS_SIZE_PARAM = 10000;
|
||||
private static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20;
|
||||
private static final double ESTABLISHED_MEMORY_CV_THRESHOLD = 0.1;
|
||||
@ -449,7 +438,7 @@ public class JobProvider {
|
||||
* Search for buckets with the parameters in the {@link BucketsQueryBuilder}
|
||||
* Uses the internal client, so runs as the _xpack user
|
||||
*/
|
||||
public void bucketsViaInternalClient(String jobId, BucketsQuery query, Consumer<QueryPage<Bucket>> handler,
|
||||
public void bucketsViaInternalClient(String jobId, BucketsQueryBuilder query, Consumer<QueryPage<Bucket>> handler,
|
||||
Consumer<Exception> errorHandler) {
|
||||
buckets(jobId, query, handler, errorHandler, client);
|
||||
}
|
||||
@ -458,62 +447,28 @@ public class JobProvider {
|
||||
* Search for buckets with the parameters in the {@link BucketsQueryBuilder}
|
||||
* Uses a supplied client, so may run as the currently authenticated user
|
||||
*/
|
||||
public void buckets(String jobId, BucketsQuery query, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler,
|
||||
public void buckets(String jobId, BucketsQueryBuilder query, Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler,
|
||||
Client client) throws ResourceNotFoundException {
|
||||
|
||||
ResultsFilterBuilder rfb = new ResultsFilterBuilder();
|
||||
if (query.getTimestamp() != null) {
|
||||
rfb.timeRange(Result.TIMESTAMP.getPreferredName(), query.getTimestamp());
|
||||
} else {
|
||||
rfb.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
|
||||
.score(Bucket.ANOMALY_SCORE.getPreferredName(), query.getAnomalyScoreFilter())
|
||||
.interim(query.isIncludeInterim());
|
||||
}
|
||||
|
||||
SortBuilder<?> sortBuilder = new FieldSortBuilder(query.getSortField())
|
||||
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
|
||||
|
||||
QueryBuilder boolQuery = new BoolQueryBuilder()
|
||||
.filter(rfb.build())
|
||||
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE));
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.sort(sortBuilder);
|
||||
searchSourceBuilder.query(boolQuery);
|
||||
searchSourceBuilder.from(query.getFrom());
|
||||
searchSourceBuilder.size(query.getSize());
|
||||
// If not using the default sort field (timestamp) add it as a secondary sort
|
||||
if (Result.TIMESTAMP.getPreferredName().equals(query.getSortField()) == false) {
|
||||
searchSourceBuilder.sort(Result.TIMESTAMP.getPreferredName(), query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
searchRequest.source(query.build());
|
||||
searchRequest.indicesOptions(addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
|
||||
|
||||
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
if (query.getTimestamp() != null) {
|
||||
if (hits.getTotalHits() == 0) {
|
||||
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
|
||||
} else if (hits.getTotalHits() > 1) {
|
||||
LOGGER.error("Found more than one bucket with timestamp [{}] from index {}", query.getTimestamp(), indexName);
|
||||
}
|
||||
}
|
||||
|
||||
List<Bucket> results = new ArrayList<>();
|
||||
for (SearchHit hit : hits.getHits()) {
|
||||
BytesReference source = hit.getSourceRef();
|
||||
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
|
||||
Bucket bucket = Bucket.PARSER.apply(parser, null);
|
||||
if (query.isIncludeInterim() || bucket.isInterim() == false) {
|
||||
results.add(bucket);
|
||||
}
|
||||
results.add(bucket);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchParseException("failed to parse bucket", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (query.getTimestamp() != null && results.isEmpty()) {
|
||||
if (query.hasTimestamp() && results.isEmpty()) {
|
||||
throw QueryPage.emptyQueryPage(Bucket.RESULTS_FIELD);
|
||||
}
|
||||
|
||||
@ -529,11 +484,11 @@ public class JobProvider {
|
||||
}, e -> errorHandler.accept(mapAuthFailure(e, jobId, GetBucketsAction.NAME))));
|
||||
}
|
||||
|
||||
private void expandBuckets(String jobId, BucketsQuery query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand,
|
||||
private void expandBuckets(String jobId, BucketsQueryBuilder query, QueryPage<Bucket> buckets, Iterator<Bucket> bucketsToExpand,
|
||||
Consumer<QueryPage<Bucket>> handler, Consumer<Exception> errorHandler, Client client) {
|
||||
if (bucketsToExpand.hasNext()) {
|
||||
Consumer<Integer> c = i -> expandBuckets(jobId, query, buckets, bucketsToExpand, handler, errorHandler, client);
|
||||
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), query.getPartitionValue(), c, errorHandler, client);
|
||||
expandBucket(jobId, query.isIncludeInterim(), bucketsToExpand.next(), c, errorHandler, client);
|
||||
} else {
|
||||
handler.accept(buckets);
|
||||
}
|
||||
@ -569,43 +524,33 @@ public class JobProvider {
|
||||
// This now gets the first 10K records for a bucket. The rate of records per bucket
|
||||
// is controlled by parameter in the c++ process and its default value is 500. Users may
|
||||
// change that. Issue elastic/machine-learning-cpp#73 is open to prevent this.
|
||||
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, String partitionFieldValue,
|
||||
Consumer<Integer> consumer, Consumer<Exception> errorHandler, Client client) {
|
||||
public void expandBucket(String jobId, boolean includeInterim, Bucket bucket, Consumer<Integer> consumer,
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
Consumer<QueryPage<AnomalyRecord>> h = page -> {
|
||||
bucket.getRecords().addAll(page.results());
|
||||
if (partitionFieldValue != null) {
|
||||
bucket.setAnomalyScore(bucket.partitionAnomalyScore(partitionFieldValue));
|
||||
}
|
||||
consumer.accept(bucket.getRecords().size());
|
||||
};
|
||||
bucketRecords(jobId, bucket, 0, RECORDS_SIZE_PARAM, includeInterim, AnomalyRecord.PROBABILITY.getPreferredName(),
|
||||
false, partitionFieldValue, h, errorHandler, client);
|
||||
false, h, errorHandler, client);
|
||||
}
|
||||
|
||||
void bucketRecords(String jobId, Bucket bucket, int from, int size, boolean includeInterim, String sortField,
|
||||
boolean descending, String partitionFieldValue, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
boolean descending, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
// Find the records using the time stamp rather than a parent-child
|
||||
// relationship. The parent-child filter involves two queries behind
|
||||
// the scenes, and Elasticsearch documentation claims it's significantly
|
||||
// slower. Here we rely on the record timestamps being identical to the
|
||||
// bucket timestamp.
|
||||
QueryBuilder recordFilter = QueryBuilders.termQuery(Result.TIMESTAMP.getPreferredName(), bucket.getTimestamp().getTime());
|
||||
RecordsQueryBuilder recordsQueryBuilder = new RecordsQueryBuilder()
|
||||
.timestamp(bucket.getTimestamp())
|
||||
.from(from)
|
||||
.size(size)
|
||||
.includeInterim(includeInterim)
|
||||
.sortField(sortField)
|
||||
.sortDescending(descending);
|
||||
|
||||
ResultsFilterBuilder builder = new ResultsFilterBuilder(recordFilter).interim(includeInterim);
|
||||
if (partitionFieldValue != null) {
|
||||
builder.term(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue);
|
||||
}
|
||||
recordFilter = builder.build();
|
||||
|
||||
FieldSortBuilder sb = null;
|
||||
if (sortField != null) {
|
||||
sb = new FieldSortBuilder(sortField)
|
||||
.missing("_last")
|
||||
.order(descending ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
|
||||
records(jobId, from, size, recordFilter, sb, SECONDARY_SORT, descending, handler, errorHandler, client);
|
||||
records(jobId, recordsQueryBuilder, handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -659,55 +604,19 @@ public class JobProvider {
|
||||
|
||||
/**
|
||||
* Search for anomaly records with the parameters in the
|
||||
* {@link org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder.RecordsQuery}
|
||||
* {@link org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder}
|
||||
* Uses a supplied client, so may run as the currently authenticated user
|
||||
*/
|
||||
public void records(String jobId, RecordsQueryBuilder.RecordsQuery query, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
QueryBuilder fb = new ResultsFilterBuilder()
|
||||
.timeRange(Result.TIMESTAMP.getPreferredName(), query.getStart(), query.getEnd())
|
||||
.score(AnomalyRecord.RECORD_SCORE.getPreferredName(), query.getRecordScoreThreshold())
|
||||
.interim(query.isIncludeInterim())
|
||||
.term(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), query.getPartitionFieldValue()).build();
|
||||
FieldSortBuilder sb = null;
|
||||
if (query.getSortField() != null) {
|
||||
sb = new FieldSortBuilder(query.getSortField())
|
||||
.missing("_last")
|
||||
.order(query.isSortDescending() ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
records(jobId, query.getFrom(), query.getSize(), fb, sb, SECONDARY_SORT, query.isSortDescending(), handler, errorHandler, client);
|
||||
}
|
||||
|
||||
/**
|
||||
* The returned records have their id set.
|
||||
*/
|
||||
private void records(String jobId, int from, int size,
|
||||
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
|
||||
boolean descending, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
public void records(String jobId, RecordsQueryBuilder recordsQueryBuilder, Consumer<QueryPage<AnomalyRecord>> handler,
|
||||
Consumer<Exception> errorHandler, Client client) {
|
||||
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||
|
||||
recordFilter = new BoolQueryBuilder()
|
||||
.filter(recordFilter)
|
||||
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), AnomalyRecord.RESULT_TYPE_VALUE));
|
||||
|
||||
SearchSourceBuilder searchSourceBuilder = recordsQueryBuilder.build();
|
||||
SearchRequest searchRequest = new SearchRequest(indexName);
|
||||
searchRequest.indicesOptions(addIgnoreUnavailable(searchRequest.indicesOptions()));
|
||||
searchRequest.source(new SearchSourceBuilder()
|
||||
.from(from)
|
||||
.size(size)
|
||||
.query(recordFilter)
|
||||
.sort(sb == null ? SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC) : sb)
|
||||
.fetchSource(true)
|
||||
);
|
||||
searchRequest.source(recordsQueryBuilder.build());
|
||||
|
||||
for (String sortField : secondarySort) {
|
||||
searchRequest.source().sort(sortField, descending ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
|
||||
LOGGER.trace("ES API CALL: search all of records from index {}{}{} with filter after sort from {} size {}",
|
||||
indexName, (sb != null) ? " with sort" : "",
|
||||
secondarySort.isEmpty() ? "" : " with secondary sort", from, size);
|
||||
LOGGER.trace("ES API CALL: search all of records from index {} with query {}", indexName, searchSourceBuilder);
|
||||
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
|
||||
List<AnomalyRecord> results = new ArrayList<>();
|
||||
for (SearchHit hit : searchResponse.getHits().getHits()) {
|
||||
@ -1015,11 +924,10 @@ public class JobProvider {
|
||||
|
||||
// Step 1. Find the time span of the most recent N bucket results, where N is the number of buckets
|
||||
// required to consider memory usage "established"
|
||||
BucketsQueryBuilder.BucketsQuery bucketQuery = new BucketsQueryBuilder()
|
||||
BucketsQueryBuilder bucketQuery = new BucketsQueryBuilder()
|
||||
.sortField(Result.TIMESTAMP.getPreferredName())
|
||||
.sortDescending(true).from(BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE - 1).size(1)
|
||||
.includeInterim(false)
|
||||
.build();
|
||||
.includeInterim(false);
|
||||
bucketsViaInternalClient(jobId, bucketQuery, bucketHandler, e -> {
|
||||
if (e instanceof ResourceNotFoundException) {
|
||||
handler.accept(null);
|
||||
|
@ -5,6 +5,21 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence;
|
||||
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermsQueryBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.FieldSortBuilder;
|
||||
import org.elasticsearch.search.sort.SortBuilders;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
|
||||
import org.elasticsearch.xpack.ml.job.results.Result;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* One time query builder for records. Sets default values for the following
|
||||
* parameters:
|
||||
@ -31,109 +46,105 @@ public final class RecordsQueryBuilder {
|
||||
|
||||
public static final int DEFAULT_SIZE = 100;
|
||||
|
||||
private RecordsQuery recordsQuery = new RecordsQuery();
|
||||
private static final List<String> SECONDARY_SORT = Arrays.asList(
|
||||
AnomalyRecord.RECORD_SCORE.getPreferredName(),
|
||||
AnomalyRecord.OVER_FIELD_VALUE.getPreferredName(),
|
||||
AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(),
|
||||
AnomalyRecord.BY_FIELD_VALUE.getPreferredName(),
|
||||
AnomalyRecord.FIELD_NAME.getPreferredName(),
|
||||
AnomalyRecord.FUNCTION.getPreferredName()
|
||||
);
|
||||
|
||||
private int from = 0;
|
||||
private int size = DEFAULT_SIZE;
|
||||
private boolean includeInterim = false;
|
||||
private String sortField;
|
||||
private boolean sortDescending = true;
|
||||
private double recordScore = 0.0;
|
||||
private String start;
|
||||
private String end;
|
||||
private Date timestamp;
|
||||
|
||||
public RecordsQueryBuilder from(int from) {
|
||||
recordsQuery.from = from;
|
||||
this.from = from;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder size(int size) {
|
||||
recordsQuery.size = size;
|
||||
this.size = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder epochStart(String startTime) {
|
||||
recordsQuery.start = startTime;
|
||||
this.start = startTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder epochEnd(String endTime) {
|
||||
recordsQuery.end = endTime;
|
||||
this.end = endTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder includeInterim(boolean include) {
|
||||
recordsQuery.includeInterim = include;
|
||||
this.includeInterim = include;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder sortField(String fieldname) {
|
||||
recordsQuery.sortField = fieldname;
|
||||
this.sortField = fieldname;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder sortDescending(boolean sortDescending) {
|
||||
recordsQuery.sortDescending = sortDescending;
|
||||
this.sortDescending = sortDescending;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder recordScore(double recordScore) {
|
||||
recordsQuery.recordScore = recordScore;
|
||||
this.recordScore = recordScore;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQueryBuilder partitionFieldValue(String partitionFieldValue) {
|
||||
recordsQuery.partitionFieldValue = partitionFieldValue;
|
||||
public RecordsQueryBuilder timestamp(Date timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecordsQuery build() {
|
||||
return recordsQuery;
|
||||
}
|
||||
public SearchSourceBuilder build() {
|
||||
QueryBuilder query = new ResultsFilterBuilder()
|
||||
.timeRange(Result.TIMESTAMP.getPreferredName(), start, end)
|
||||
.score(AnomalyRecord.RECORD_SCORE.getPreferredName(), recordScore)
|
||||
.interim(includeInterim)
|
||||
.build();
|
||||
|
||||
public void clear() {
|
||||
recordsQuery = new RecordsQuery();
|
||||
}
|
||||
|
||||
public class RecordsQuery {
|
||||
|
||||
private int from = 0;
|
||||
private int size = DEFAULT_SIZE;
|
||||
private boolean includeInterim = false;
|
||||
private String sortField;
|
||||
private boolean sortDescending = true;
|
||||
private double recordScore = 0.0;
|
||||
private String partitionFieldValue;
|
||||
private String start;
|
||||
private String end;
|
||||
|
||||
|
||||
public int getSize() {
|
||||
return size;
|
||||
FieldSortBuilder sb;
|
||||
if (sortField != null) {
|
||||
sb = new FieldSortBuilder(sortField)
|
||||
.missing("_last")
|
||||
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
||||
} else {
|
||||
sb = SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC);
|
||||
}
|
||||
|
||||
public boolean isIncludeInterim() {
|
||||
return includeInterim;
|
||||
BoolQueryBuilder recordFilter = new BoolQueryBuilder()
|
||||
.filter(query)
|
||||
.filter(new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), AnomalyRecord.RESULT_TYPE_VALUE));
|
||||
if (timestamp != null) {
|
||||
recordFilter.filter(QueryBuilders.termQuery(Result.TIMESTAMP.getPreferredName(), timestamp.getTime()));
|
||||
}
|
||||
|
||||
public String getSortField() {
|
||||
return sortField;
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
|
||||
.from(from)
|
||||
.size(size)
|
||||
.query(recordFilter)
|
||||
.sort(sb)
|
||||
.fetchSource(true);
|
||||
|
||||
for (String sortField : SECONDARY_SORT) {
|
||||
searchSourceBuilder.sort(sortField, sortDescending ? SortOrder.DESC : SortOrder.ASC);
|
||||
}
|
||||
|
||||
public boolean isSortDescending() {
|
||||
return sortDescending;
|
||||
}
|
||||
|
||||
public double getRecordScoreThreshold() {
|
||||
return recordScore;
|
||||
}
|
||||
|
||||
public String getPartitionFieldValue() {
|
||||
return partitionFieldValue;
|
||||
}
|
||||
|
||||
public int getFrom() {
|
||||
return from;
|
||||
}
|
||||
|
||||
public String getStart() {
|
||||
return start;
|
||||
}
|
||||
|
||||
public String getEnd() {
|
||||
return end;
|
||||
}
|
||||
return searchSourceBuilder;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,19 +20,19 @@ import java.util.List;
|
||||
* This builder facilitates the creation of a {@link QueryBuilder} with common
|
||||
* characteristics to both buckets and records.
|
||||
*/
|
||||
class ResultsFilterBuilder {
|
||||
public class ResultsFilterBuilder {
|
||||
private final List<QueryBuilder> queries;
|
||||
|
||||
ResultsFilterBuilder() {
|
||||
public ResultsFilterBuilder() {
|
||||
queries = new ArrayList<>();
|
||||
}
|
||||
|
||||
ResultsFilterBuilder(QueryBuilder queryBuilder) {
|
||||
public ResultsFilterBuilder(QueryBuilder queryBuilder) {
|
||||
this();
|
||||
queries.add(queryBuilder);
|
||||
}
|
||||
|
||||
ResultsFilterBuilder timeRange(String field, Object start, Object end) {
|
||||
public ResultsFilterBuilder timeRange(String field, Object start, Object end) {
|
||||
if (start != null || end != null) {
|
||||
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(field);
|
||||
if (start != null) {
|
||||
@ -46,12 +46,12 @@ class ResultsFilterBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
ResultsFilterBuilder timeRange(String field, String timestamp) {
|
||||
public ResultsFilterBuilder timeRange(String field, String timestamp) {
|
||||
addQuery(QueryBuilders.matchQuery(field, timestamp));
|
||||
return this;
|
||||
}
|
||||
|
||||
ResultsFilterBuilder score(String fieldName, double threshold) {
|
||||
public ResultsFilterBuilder score(String fieldName, double threshold) {
|
||||
if (threshold > 0.0) {
|
||||
RangeQueryBuilder scoreFilter = QueryBuilders.rangeQuery(fieldName);
|
||||
scoreFilter.gte(threshold);
|
||||
@ -78,7 +78,7 @@ class ResultsFilterBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
ResultsFilterBuilder term(String fieldName, String fieldValue) {
|
||||
public ResultsFilterBuilder term(String fieldName, String fieldValue) {
|
||||
if (Strings.isNullOrEmpty(fieldName) || Strings.isNullOrEmpty(fieldValue)) {
|
||||
return this;
|
||||
}
|
||||
@ -88,7 +88,7 @@ class ResultsFilterBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
ResultsFilterBuilder resultType(String resultType) {
|
||||
public ResultsFilterBuilder resultType(String resultType) {
|
||||
return term(Result.RESULT_TYPE.getPreferredName(), resultType);
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
resultProcessor.process(builder.buildTestProcess());
|
||||
resultProcessor.awaitCompletion();
|
||||
|
||||
BucketsQueryBuilder.BucketsQuery bucketsQuery = new BucketsQueryBuilder().includeInterim(true).build();
|
||||
BucketsQueryBuilder bucketsQuery = new BucketsQueryBuilder().includeInterim(true);
|
||||
QueryPage<Bucket> persistedBucket = getBucketQueryPage(bucketsQuery);
|
||||
assertEquals(1, persistedBucket.count());
|
||||
// Records are not persisted to Elasticsearch as an array within the bucket
|
||||
@ -149,7 +149,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
bucket.setRecords(Collections.emptyList());
|
||||
assertEquals(bucket, persistedBucket.results().get(0));
|
||||
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().build());
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder());
|
||||
assertResultsAreSame(records, persistedRecords);
|
||||
|
||||
QueryPage<Influencer> persistedInfluencers = getInfluencers();
|
||||
@ -190,7 +190,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
resultProcessor.process(resultBuilder.buildTestProcess());
|
||||
resultProcessor.awaitCompletion();
|
||||
|
||||
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
|
||||
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
|
||||
assertEquals(1, persistedBucket.count());
|
||||
// Records are not persisted to Elasticsearch as an array within the bucket
|
||||
// documents, so remove them from the expected bucket before comparing
|
||||
@ -200,7 +200,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
QueryPage<Influencer> persistedInfluencers = getInfluencers();
|
||||
assertEquals(0, persistedInfluencers.count());
|
||||
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().includeInterim(true).build());
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().includeInterim(true));
|
||||
assertEquals(0, persistedRecords.count());
|
||||
}
|
||||
|
||||
@ -222,14 +222,14 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
resultProcessor.process(resultBuilder.buildTestProcess());
|
||||
resultProcessor.awaitCompletion();
|
||||
|
||||
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
|
||||
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
|
||||
assertEquals(1, persistedBucket.count());
|
||||
// Records are not persisted to Elasticsearch as an array within the bucket
|
||||
// documents, so remove them from the expected bucket before comparing
|
||||
finalBucket.setRecords(Collections.emptyList());
|
||||
assertEquals(finalBucket, persistedBucket.results().get(0));
|
||||
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().includeInterim(true).build());
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().includeInterim(true));
|
||||
assertResultsAreSame(finalAnomalyRecords, persistedRecords);
|
||||
}
|
||||
|
||||
@ -246,10 +246,10 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
resultProcessor.process(resultBuilder.buildTestProcess());
|
||||
resultProcessor.awaitCompletion();
|
||||
|
||||
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true).build());
|
||||
QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
|
||||
assertEquals(1, persistedBucket.count());
|
||||
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().size(200).includeInterim(true).build());
|
||||
QueryPage<AnomalyRecord> persistedRecords = getRecords(new RecordsQueryBuilder().size(200).includeInterim(true));
|
||||
List<AnomalyRecord> allRecords = new ArrayList<>(firstSetOfRecords);
|
||||
allRecords.addAll(secondSetOfRecords);
|
||||
assertResultsAreSame(allRecords, persistedRecords);
|
||||
@ -419,7 +419,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
assertEquals(0, expectedSet.size());
|
||||
}
|
||||
|
||||
private QueryPage<Bucket> getBucketQueryPage(BucketsQueryBuilder.BucketsQuery bucketsQuery) throws Exception {
|
||||
private QueryPage<Bucket> getBucketQueryPage(BucketsQueryBuilder bucketsQuery) throws Exception {
|
||||
AtomicReference<Exception> errorHolder = new AtomicReference<>();
|
||||
AtomicReference<QueryPage<Bucket>> resultHolder = new AtomicReference<>();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
@ -491,7 +491,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||
return resultHolder.get();
|
||||
}
|
||||
|
||||
private QueryPage<AnomalyRecord> getRecords(RecordsQueryBuilder.RecordsQuery recordsQuery) throws Exception {
|
||||
private QueryPage<AnomalyRecord> getRecords(RecordsQueryBuilder recordsQuery) throws Exception {
|
||||
AtomicReference<Exception> errorHolder = new AtomicReference<>();
|
||||
AtomicReference<QueryPage<AnomalyRecord>> resultHolder = new AtomicReference<>();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -1,99 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.persistence;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
public class BucketsQueryBuilderTests extends ESTestCase {
|
||||
|
||||
public void testDefaultBuild() throws Exception {
|
||||
BucketsQueryBuilder.BucketsQuery query = new BucketsQueryBuilder().build();
|
||||
|
||||
assertEquals(0, query.getFrom());
|
||||
assertEquals(BucketsQueryBuilder.DEFAULT_SIZE, query.getSize());
|
||||
assertEquals(false, query.isIncludeInterim());
|
||||
assertEquals(false, query.isExpand());
|
||||
assertEquals(0.0, query.getAnomalyScoreFilter(), 0.0001);
|
||||
assertNull(query.getStart());
|
||||
assertNull(query.getEnd());
|
||||
assertEquals("timestamp", query.getSortField());
|
||||
assertFalse(query.isSortDescending());
|
||||
}
|
||||
|
||||
public void testAll() {
|
||||
BucketsQueryBuilder.BucketsQuery query = new BucketsQueryBuilder()
|
||||
.from(20)
|
||||
.size(40)
|
||||
.includeInterim(true)
|
||||
.expand(true)
|
||||
.anomalyScoreThreshold(50.0d)
|
||||
.start("1000")
|
||||
.end("2000")
|
||||
.partitionValue("foo")
|
||||
.sortField("anomaly_score")
|
||||
.sortDescending(true)
|
||||
.build();
|
||||
|
||||
assertEquals(20, query.getFrom());
|
||||
assertEquals(40, query.getSize());
|
||||
assertEquals(true, query.isIncludeInterim());
|
||||
assertEquals(true, query.isExpand());
|
||||
assertEquals(50.0d, query.getAnomalyScoreFilter(), 0.00001);
|
||||
assertEquals("1000", query.getStart());
|
||||
assertEquals("2000", query.getEnd());
|
||||
assertEquals("foo", query.getPartitionValue());
|
||||
assertEquals("anomaly_score", query.getSortField());
|
||||
assertTrue(query.isSortDescending());
|
||||
}
|
||||
|
||||
public void testEqualsHash() {
|
||||
BucketsQueryBuilder query = new BucketsQueryBuilder()
|
||||
.from(20)
|
||||
.size(40)
|
||||
.includeInterim(true)
|
||||
.expand(true)
|
||||
.anomalyScoreThreshold(50.0d)
|
||||
.start("1000")
|
||||
.end("2000")
|
||||
.partitionValue("foo");
|
||||
|
||||
BucketsQueryBuilder query2 = new BucketsQueryBuilder()
|
||||
.from(20)
|
||||
.size(40)
|
||||
.includeInterim(true)
|
||||
.expand(true)
|
||||
.anomalyScoreThreshold(50.0d)
|
||||
.start("1000")
|
||||
.end("2000")
|
||||
.partitionValue("foo");
|
||||
|
||||
assertEquals(query.build(), query2.build());
|
||||
assertEquals(query.build().hashCode(), query2.build().hashCode());
|
||||
query2.clear();
|
||||
assertFalse(query.build().equals(query2.build()));
|
||||
|
||||
query2.from(20)
|
||||
.size(40)
|
||||
.includeInterim(true)
|
||||
.expand(true)
|
||||
.anomalyScoreThreshold(50.0d)
|
||||
.start("1000")
|
||||
.end("2000")
|
||||
.partitionValue("foo");
|
||||
assertEquals(query.build(), query2.build());
|
||||
|
||||
query2.clear();
|
||||
query2.from(20)
|
||||
.size(40)
|
||||
.includeInterim(true)
|
||||
.expand(true)
|
||||
.anomalyScoreThreshold(50.1d)
|
||||
.start("1000")
|
||||
.end("2000")
|
||||
.partitionValue("foo");
|
||||
assertFalse(query.build().equals(query2.build()));
|
||||
}
|
||||
}
|
@ -72,7 +72,6 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
public class JobProviderTests extends ESTestCase {
|
||||
private static final String CLUSTER_NAME = "myCluster";
|
||||
private static final String JOB_ID = "foo";
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCreateJobResultsIndex() {
|
||||
@ -253,7 +252,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] holder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
provider.buckets(jobId, bq, r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<Bucket> buckets = holder[0];
|
||||
assertEquals(1L, buckets.count());
|
||||
QueryBuilder query = queryBuilderHolder[0];
|
||||
@ -288,7 +287,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] holder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
provider.buckets(jobId, bq, r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<Bucket> buckets = holder[0];
|
||||
assertEquals(1L, buckets.count());
|
||||
QueryBuilder query = queryBuilderHolder[0];
|
||||
@ -325,7 +324,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] holder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
provider.buckets(jobId, bq, r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||
QueryPage<Bucket> buckets = holder[0];
|
||||
assertEquals(1L, buckets.count());
|
||||
QueryBuilder query = queryBuilderHolder[0];
|
||||
@ -334,7 +333,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
assertFalse(queryString.matches("(?s).*is_interim.*"));
|
||||
}
|
||||
|
||||
public void testBucket_NoBucketNoExpandNoInterim()
|
||||
public void testBucket_NoBucketNoExpand()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
String jobId = "TestJobIdentification";
|
||||
Long timestamp = 98765432123456789L;
|
||||
@ -348,11 +347,11 @@ public class JobProviderTests extends ESTestCase {
|
||||
BucketsQueryBuilder bq = new BucketsQueryBuilder();
|
||||
bq.timestamp(Long.toString(timestamp));
|
||||
Exception[] holder = new Exception[1];
|
||||
provider.buckets(jobId, bq.build(), q -> {}, e -> holder[0] = e, client);
|
||||
provider.buckets(jobId, bq, q -> {}, e -> holder[0] = e, client);
|
||||
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
||||
}
|
||||
|
||||
public void testBucket_OneBucketNoExpandNoInterim()
|
||||
public void testBucket_OneBucketNoExpand()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
String jobId = "TestJobIdentification";
|
||||
Date now = new Date();
|
||||
@ -373,37 +372,12 @@ public class JobProviderTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<Bucket>[] bucketHolder = new QueryPage[1];
|
||||
provider.buckets(jobId, bq.build(), q -> bucketHolder[0] = q, e -> {}, client);
|
||||
provider.buckets(jobId, bq, q -> bucketHolder[0] = q, e -> {}, client);
|
||||
assertThat(bucketHolder[0].count(), equalTo(1L));
|
||||
Bucket b = bucketHolder[0].results().get(0);
|
||||
assertEquals(now, b.getTimestamp());
|
||||
}
|
||||
|
||||
public void testBucket_OneBucketNoExpandInterim()
|
||||
throws InterruptedException, ExecutionException, IOException {
|
||||
String jobId = "TestJobIdentification";
|
||||
Date now = new Date();
|
||||
List<Map<String, Object>> source = new ArrayList<>();
|
||||
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
map.put("job_id", "foo");
|
||||
map.put("timestamp", now.getTime());
|
||||
map.put("bucket_span", 22);
|
||||
map.put("is_interim", true);
|
||||
source.add(map);
|
||||
|
||||
SearchResponse response = createSearchResponse(source);
|
||||
Client client = getMockedClient(queryBuilder -> {}, response);
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
BucketsQueryBuilder bq = new BucketsQueryBuilder();
|
||||
bq.timestamp(Long.toString(now.getTime()));
|
||||
|
||||
Exception[] holder = new Exception[1];
|
||||
provider.buckets(jobId, bq.build(), q -> {}, e -> holder[0] = e, client);
|
||||
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
||||
}
|
||||
|
||||
public void testRecords() throws InterruptedException, ExecutionException, IOException {
|
||||
String jobId = "TestJobIdentification";
|
||||
Date now = new Date();
|
||||
@ -439,7 +413,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
|
||||
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new, client);
|
||||
provider.records(jobId, rqb, page -> holder[0] = page, RuntimeException::new, client);
|
||||
QueryPage<AnomalyRecord> recordPage = holder[0];
|
||||
assertEquals(2L, recordPage.count());
|
||||
List<AnomalyRecord> records = recordPage.results();
|
||||
@ -493,7 +467,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
|
||||
provider.records(jobId, rqb.build(), page -> holder[0] = page, RuntimeException::new, client);
|
||||
provider.records(jobId, rqb, page -> holder[0] = page, RuntimeException::new, client);
|
||||
QueryPage<AnomalyRecord> recordPage = holder[0];
|
||||
assertEquals(2L, recordPage.count());
|
||||
List<AnomalyRecord> records = recordPage.results();
|
||||
@ -538,7 +512,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
|
||||
provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, "", page -> holder[0] = page, RuntimeException::new,
|
||||
provider.bucketRecords(jobId, bucket, from, size, true, sortfield, true, page -> holder[0] = page, RuntimeException::new,
|
||||
client);
|
||||
QueryPage<AnomalyRecord> recordPage = holder[0];
|
||||
assertEquals(2L, recordPage.count());
|
||||
@ -574,8 +548,7 @@ public class JobProviderTests extends ESTestCase {
|
||||
JobProvider provider = createProvider(client);
|
||||
|
||||
Integer[] holder = new Integer[1];
|
||||
provider.expandBucket(jobId, false, bucket, null, records -> holder[0] = records, RuntimeException::new,
|
||||
client);
|
||||
provider.expandBucket(jobId, false, bucket, records -> holder[0] = records, RuntimeException::new, client);
|
||||
int records = holder[0];
|
||||
assertEquals(400L, records);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user