[ML] Accept more varied Datafeed Aggregations (elastic/x-pack-elasticsearch#2038)

Original commit: elastic/x-pack-elasticsearch@ec1477f41c
This commit is contained in:
David Kyle 2017-07-25 16:45:47 +01:00 committed by GitHub
parent 8f6d9df96e
commit 74d06216c2
15 changed files with 717 additions and 253 deletions

View File

@ -25,6 +25,8 @@ import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.MlParserType;
import org.elasticsearch.xpack.ml.job.config.Job;
@ -219,32 +221,42 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}
/**
* Returns the top level histogram's interval as epoch millis.
* The method expects a valid top level aggregation to exist.
* Returns the histogram's interval as epoch millis.
*/
public long getHistogramIntervalMillis() {
return getHistogramIntervalMillis(aggregations);
AggregationBuilder histogramAggregation = getHistogramAggregation(aggregations.getAggregatorFactories());
return getHistogramIntervalMillis(histogramAggregation);
}
private static long getHistogramIntervalMillis(AggregatorFactories.Builder aggregations) {
AggregationBuilder topLevelAgg = getTopLevelAgg(aggregations);
if (topLevelAgg == null) {
throw new IllegalStateException("No aggregations exist");
}
if (topLevelAgg instanceof HistogramAggregationBuilder) {
return (long) ((HistogramAggregationBuilder) topLevelAgg).interval();
} else if (topLevelAgg instanceof DateHistogramAggregationBuilder) {
return validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) topLevelAgg);
private static long getHistogramIntervalMillis(AggregationBuilder histogramAggregation) {
if (histogramAggregation instanceof HistogramAggregationBuilder) {
return (long) ((HistogramAggregationBuilder) histogramAggregation).interval();
} else if (histogramAggregation instanceof DateHistogramAggregationBuilder) {
return validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) histogramAggregation);
} else {
throw new IllegalStateException("Invalid top level aggregation [" + topLevelAgg.getName() + "]");
throw new IllegalStateException("Invalid histogram aggregation [" + histogramAggregation.getName() + "]");
}
}
private static AggregationBuilder getTopLevelAgg(AggregatorFactories.Builder aggregations) {
if (aggregations == null || aggregations.getAggregatorFactories().isEmpty()) {
return null;
static AggregationBuilder getHistogramAggregation(List<AggregationBuilder> aggregations) {
if (aggregations.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM));
}
return aggregations.getAggregatorFactories().get(0);
if (aggregations.size() != 1) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM_NO_SIBLINGS);
}
AggregationBuilder agg = aggregations.get(0);
if (isHistogram(agg)) {
return agg;
} else {
return getHistogramAggregation(agg.getSubAggregations());
}
}
private static boolean isHistogram(AggregationBuilder aggregationBuilder) {
return aggregationBuilder instanceof HistogramAggregationBuilder
|| aggregationBuilder instanceof DateHistogramAggregationBuilder;
}
/**
@ -545,7 +557,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
chunkingConfig);
}
private void validateAggregations() {
void validateAggregations() {
if (aggregations == null) {
return;
}
@ -557,17 +569,45 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (aggregatorFactories.isEmpty()) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM);
}
AggregationBuilder topLevelAgg = aggregatorFactories.get(0);
if (topLevelAgg instanceof HistogramAggregationBuilder) {
if (((HistogramAggregationBuilder) topLevelAgg).interval() <= 0) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO);
AggregationBuilder histogramAggregation = getHistogramAggregation(aggregatorFactories);
checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations());
checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation);
checkHistogramIntervalIsPositive(histogramAggregation);
}
private static void checkNoMoreHistogramAggregations(List<AggregationBuilder> aggregations) {
for (AggregationBuilder agg : aggregations) {
if (isHistogram(agg)) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM);
}
} else if (topLevelAgg instanceof DateHistogramAggregationBuilder) {
if (validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) topLevelAgg) <= 0) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO);
checkNoMoreHistogramAggregations(agg.getSubAggregations());
}
}
static void checkHistogramAggregationHasChildMaxTimeAgg(AggregationBuilder histogramAggregation) {
String timeField = null;
if (histogramAggregation instanceof ValuesSourceAggregationBuilder) {
timeField = ((ValuesSourceAggregationBuilder) histogramAggregation).field();
}
for (AggregationBuilder agg : histogramAggregation.getSubAggregations()) {
if (agg instanceof MaxAggregationBuilder) {
MaxAggregationBuilder maxAgg = (MaxAggregationBuilder)agg;
if (maxAgg.field().equals(timeField)) {
return;
}
}
} else {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM);
}
throw ExceptionsHelper.badRequestException(
Messages.getMessage(Messages.DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION, timeField));
}
private static void checkHistogramIntervalIsPositive(AggregationBuilder histogramAggregation) {
long interval = getHistogramIntervalMillis(histogramAggregation);
if (interval <= 0) {
throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO);
}
}
@ -576,7 +616,8 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (aggregations == null) {
chunkingConfig = ChunkingConfig.newAuto();
} else {
long histogramIntervalMillis = getHistogramIntervalMillis(aggregations);
AggregationBuilder histogramAggregation = getHistogramAggregation(aggregations.getAggregatorFactories());
long histogramIntervalMillis = getHistogramIntervalMillis(histogramAggregation);
chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis(
DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis));
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
public final class DatafeedJobValidator {
@ -23,7 +24,7 @@ public final class DatafeedJobValidator {
public static void validate(DatafeedConfig datafeedConfig, Job job) {
AnalysisConfig analysisConfig = job.getAnalysisConfig();
if (analysisConfig.getLatency() != null && analysisConfig.getLatency().seconds() > 0) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
}
if (datafeedConfig.hasAggregations()) {
checkSummaryCountFieldNameIsSet(analysisConfig);
@ -33,7 +34,7 @@ public final class DatafeedJobValidator {
private static void checkSummaryCountFieldNameIsSet(AnalysisConfig analysisConfig) {
if (Strings.isNullOrEmpty(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException(Messages.getMessage(
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD));
}
}
@ -42,7 +43,7 @@ public final class DatafeedJobValidator {
long histogramIntervalMillis = datafeedConfig.getHistogramIntervalMillis();
long bucketSpanMillis = analysisConfig.getBucketSpan().millis();
if (histogramIntervalMillis > bucketSpanMillis) {
throw new IllegalArgumentException(Messages.getMessage(
throw ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_LESS_OR_EQUAL_TO_BUCKET_SPAN,
TimeValue.timeValueMillis(histogramIntervalMillis).getStringRep(),
TimeValue.timeValueMillis(bucketSpanMillis).getStringRep()));

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
@ -22,8 +21,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
@ -53,14 +50,15 @@ class AggregationDataExtractor implements DataExtractor {
private final AggregationDataExtractorContext context;
private boolean hasNext;
private boolean isCancelled;
private LinkedList<Histogram.Bucket> histogramBuckets;
private AggregationToJsonProcessor aggregationToJsonProcessor;
private ByteArrayOutputStream outputStream;
AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client);
this.context = Objects.requireNonNull(dataExtractorContext);
this.hasNext = true;
this.isCancelled = false;
this.histogramBuckets = null;
context = Objects.requireNonNull(dataExtractorContext);
hasNext = true;
isCancelled = false;
outputStream = new ByteArrayOutputStream();
}
@Override
@ -85,20 +83,30 @@ class AggregationDataExtractor implements DataExtractor {
if (!hasNext()) {
throw new NoSuchElementException();
}
if (histogramBuckets == null) {
histogramBuckets = search();
if (aggregationToJsonProcessor == null) {
Aggregations aggs = search();
if (aggs == null) {
hasNext = false;
return Optional.empty();
}
initAggregationProcessor(aggs);
}
return Optional.ofNullable(processNextBatch());
}
private LinkedList<Histogram.Bucket> search() throws IOException {
if (histogramBuckets != null) {
throw new IllegalStateException("search should only be performed once");
}
private Aggregations search() throws IOException {
LOGGER.debug("[{}] Executing aggregated search", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
return new LinkedList<>(getHistogramBuckets(searchResponse.getAggregations()));
return validateAggs(searchResponse.getAggregations());
}
private void initAggregationProcessor(Aggregations aggs) throws IOException {
aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount);
aggregationToJsonProcessor.process(aggs);
}
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
@ -117,43 +125,26 @@ class AggregationDataExtractor implements DataExtractor {
return searchRequestBuilder;
}
private List<? extends Histogram.Bucket> getHistogramBuckets(@Nullable Aggregations aggs) {
private Aggregations validateAggs(@Nullable Aggregations aggs) {
if (aggs == null) {
return Collections.emptyList();
return null;
}
List<Aggregation> aggsAsList = aggs.asList();
if (aggsAsList.isEmpty()) {
return Collections.emptyList();
return null;
}
if (aggsAsList.size() > 1) {
throw new IllegalArgumentException("Multiple top level aggregations not supported; found: "
+ aggsAsList.stream().map(Aggregation::getName).collect(Collectors.toList()));
}
Aggregation topAgg = aggsAsList.get(0);
if (topAgg instanceof Histogram) {
return ((Histogram) topAgg).getBuckets();
} else {
throw new IllegalArgumentException("Top level aggregation should be [histogram]");
}
return aggs;
}
private InputStream processNextBatch() throws IOException {
if (histogramBuckets.isEmpty()) {
hasNext = false;
return null;
}
outputStream.reset();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(
context.timeField, context.fields, context.includeDocCount, outputStream)) {
while (histogramBuckets.isEmpty() == false && processor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) {
processor.process(histogramBuckets.removeFirst());
}
if (histogramBuckets.isEmpty()) {
hasNext = false;
}
}
hasNext = aggregationToJsonProcessor.writeDocs(BATCH_KEY_VALUE_PAIRS, outputStream);
return new ByteArrayInputStream(outputStream.toByteArray());
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.aggregations.Aggregation;
@ -18,6 +17,7 @@ import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import java.io.IOException;
import java.io.OutputStream;
@ -29,20 +29,22 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation.
* In order to ensure that datafeeds can restart without duplicating data, we require that
* each histogram bucket has a nested max aggregation matching the time_field.
*/
class AggregationToJsonProcessor implements Releasable {
class AggregationToJsonProcessor {
private final String timeField;
private final Set<String> fields;
private final boolean includeDocCount;
private final XContentBuilder jsonBuilder;
private final Map<String, Object> keyValuePairs;
private final LinkedHashMap<String, Object> keyValuePairs;
private long keyValueWrittenCount;
private SortedMap<Long, List<Map<String, Object>>> docsByBucketTimestamp;
/**
* Constructs a processor that processes aggregations into JSON
@ -50,21 +52,23 @@ class AggregationToJsonProcessor implements Releasable {
* @param timeField the time field
* @param fields the fields to convert into JSON
* @param includeDocCount whether to include the doc_count
* @param outputStream the stream to write the output
* @throws IOException if an error occurs during the processing
*/
AggregationToJsonProcessor(String timeField, Set<String> fields, boolean includeDocCount, OutputStream outputStream)
AggregationToJsonProcessor(String timeField, Set<String> fields, boolean includeDocCount)
throws IOException {
this.timeField = Objects.requireNonNull(timeField);
this.fields = Objects.requireNonNull(fields);
this.includeDocCount = includeDocCount;
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
keyValuePairs = new LinkedHashMap<>();
docsByBucketTimestamp = new TreeMap<>();
keyValueWrittenCount = 0;
}
public void process(Aggregations aggs) throws IOException {
processAggs(0, aggs.asList());
}
/**
* Processes a {@link Histogram.Bucket} and writes a flat JSON document for each of its leaf aggregations.
* Processes a list of {@link Aggregation}s and writes a flat JSON document for each of its leaf aggregations.
* Supported sub-aggregations include:
* <ul>
* <li>{@link MultiBucketsAggregation}</li>
@ -72,70 +76,129 @@ class AggregationToJsonProcessor implements Releasable {
* <li>{@link Percentiles}</li>
* </ul>
*/
public void process(Histogram.Bucket bucket) throws IOException {
if (bucket.getDocCount() == 0) {
return;
}
Aggregations aggs = bucket.getAggregations();
Aggregation timeAgg = aggs == null ? null : aggs.get(timeField);
if (timeAgg instanceof Max == false) {
throw new IllegalArgumentException("Missing max aggregation for time_field [" + timeField + "]");
}
// We want to handle the max time aggregation only at the bucket level.
// So, we add the value here and then remove the aggregation before
// processing the rest of the sub aggs.
long timestamp = (long) ((Max) timeAgg).value();
keyValuePairs.put(timeField, timestamp);
List<Aggregation> subAggs = new ArrayList<>(aggs.asList());
subAggs.remove(timeAgg);
processNestedAggs(bucket.getDocCount(), subAggs);
}
private void processNestedAggs(long docCount, List<Aggregation> aggs) throws IOException {
if (aggs.isEmpty()) {
private void processAggs(long docCount, List<Aggregation> aggregations) throws IOException {
if (aggregations.isEmpty()) {
// This means we reached a bucket aggregation without sub-aggs. Thus, we can flush the path written so far.
writeJsonObject(docCount);
queueDocToWrite(keyValuePairs, docCount);
return;
}
boolean processedBucketAgg = false;
List<Aggregation> leafAggregations = new ArrayList<>();
List<MultiBucketsAggregation> bucketAggregations = new ArrayList<>();
// Sort into leaf and bucket aggregations.
// The leaf aggregations will be processed first.
for (Aggregation agg : aggregations) {
if (agg instanceof MultiBucketsAggregation) {
bucketAggregations.add((MultiBucketsAggregation)agg);
} else {
leafAggregations.add(agg);
}
}
if (bucketAggregations.size() > 1) {
throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported");
}
List<String> addedLeafKeys = new ArrayList<>();
for (Aggregation agg : aggs) {
if (fields.contains(agg.getName())) {
if (agg instanceof MultiBucketsAggregation) {
if (processedBucketAgg) {
throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported");
}
if (addedLeafKeys.isEmpty() == false) {
throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported");
}
processedBucketAgg = true;
processBucket((MultiBucketsAggregation) agg);
} else {
if (processedBucketAgg) {
throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported");
}
String addedKey = processLeaf(agg);
if (addedKey != null) {
addedLeafKeys.add(addedKey);
}
for (Aggregation leafAgg : leafAggregations) {
if (timeField.equals(leafAgg.getName())) {
processTimeField(leafAgg);
} else if (fields.contains(leafAgg.getName())) {
boolean leafAdded = processLeaf(leafAgg);
if (leafAdded) {
addedLeafKeys.add(leafAgg.getName());
}
}
}
if (addedLeafKeys.isEmpty() == false) {
writeJsonObject(docCount);
addedLeafKeys.forEach(k -> keyValuePairs.remove(k));
boolean noMoreBucketsToProcess = bucketAggregations.isEmpty();
if (noMoreBucketsToProcess == false) {
MultiBucketsAggregation bucketAgg = bucketAggregations.get(0);
if (bucketAgg instanceof Histogram) {
processDateHistogram((Histogram) bucketAgg);
} else {
// Ignore bucket aggregations that don't contain a field we
// are interested in. This avoids problems with pipeline bucket
// aggregations where we would create extra docs traversing a
// bucket agg that isn't used but is required for the pipeline agg.
if (bucketAggContainsRequiredAgg(bucketAgg)) {
processBucket(bucketAgg, fields.contains(bucketAgg.getName()));
} else {
noMoreBucketsToProcess = true;
}
}
}
// If there are no more bucket aggregations to process we've reached the end
// and it's time to write the doc
if (noMoreBucketsToProcess) {
queueDocToWrite(keyValuePairs, docCount);
}
addedLeafKeys.forEach(k -> keyValuePairs.remove(k));
}
private void processDateHistogram(Histogram agg) throws IOException {
if (keyValuePairs.containsKey(timeField)) {
throw new IllegalArgumentException("More than one Date histogram cannot be used in the aggregation. " +
"[" + agg.getName() + "] is another instance of a Date histogram");
}
for (Histogram.Bucket bucket : agg.getBuckets()) {
List<Aggregation> childAggs = bucket.getAggregations().asList();
processAggs(bucket.getDocCount(), childAggs);
keyValuePairs.remove(timeField);
}
}
private void processBucket(MultiBucketsAggregation bucketAgg) throws IOException {
private void processTimeField(Aggregation agg) {
if (agg instanceof Max == false) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD, timeField));
}
long timestamp = (long) ((Max) agg).value();
keyValuePairs.put(timeField, timestamp);
}
boolean bucketAggContainsRequiredAgg(MultiBucketsAggregation aggregation) {
if (fields.contains(aggregation.getName())) {
return true;
}
if (aggregation.getBuckets().isEmpty()) {
return false;
}
boolean foundRequiredAgg = false;
List<Aggregation> aggs = asList(aggregation.getBuckets().get(0).getAggregations());
for (Aggregation agg : aggs) {
if (fields.contains(agg.getName())) {
foundRequiredAgg = true;
break;
}
if (agg instanceof MultiBucketsAggregation) {
foundRequiredAgg = bucketAggContainsRequiredAgg((MultiBucketsAggregation) agg);
if (foundRequiredAgg) {
break;
}
}
}
return foundRequiredAgg;
}
private void processBucket(MultiBucketsAggregation bucketAgg, boolean addField) throws IOException {
for (MultiBucketsAggregation.Bucket bucket : bucketAgg.getBuckets()) {
keyValuePairs.put(bucketAgg.getName(), bucket.getKey());
processNestedAggs(bucket.getDocCount(), asList(bucket.getAggregations()));
keyValuePairs.remove(bucketAgg.getName());
if (addField) {
keyValuePairs.put(bucketAgg.getName(), bucket.getKey());
}
processAggs(bucket.getDocCount(), asList(bucket.getAggregations()));
if (addField) {
keyValuePairs.remove(bucketAgg.getName());
}
}
}
@ -143,8 +206,7 @@ class AggregationToJsonProcessor implements Releasable {
* Adds a leaf key-value. It returns the name of the key added or {@code null} when nothing was added.
* Non-finite metric values are not added.
*/
@Nullable
private String processLeaf(Aggregation agg) throws IOException {
private boolean processLeaf(Aggregation agg) throws IOException {
if (agg instanceof NumericMetricsAggregation.SingleValue) {
return processSingleValue((NumericMetricsAggregation.SingleValue) agg);
} else if (agg instanceof Percentiles) {
@ -154,46 +216,88 @@ class AggregationToJsonProcessor implements Releasable {
}
}
private String processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException {
private boolean processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException {
return addMetricIfFinite(singleValue.getName(), singleValue.value());
}
@Nullable
private String addMetricIfFinite(String key, double value) {
private boolean addMetricIfFinite(String key, double value) {
if (Double.isFinite(value)) {
keyValuePairs.put(key, value);
return key;
return true;
}
return null;
return false;
}
private String processPercentiles(Percentiles percentiles) throws IOException {
private boolean processPercentiles(Percentiles percentiles) throws IOException {
Iterator<Percentile> percentileIterator = percentiles.iterator();
String addedKey = addMetricIfFinite(percentiles.getName(), percentileIterator.next().getValue());
boolean aggregationAdded = addMetricIfFinite(percentiles.getName(), percentileIterator.next().getValue());
if (percentileIterator.hasNext()) {
throw new IllegalArgumentException("Multi-percentile aggregation [" + percentiles.getName() + "] is not supported");
}
return addedKey;
return aggregationAdded;
}
private void writeJsonObject(long docCount) throws IOException {
private void queueDocToWrite(Map<String, Object> doc, long docCount) {
if (docCount > 0) {
jsonBuilder.startObject();
for (Map.Entry<String, Object> keyValue : keyValuePairs.entrySet()) {
jsonBuilder.field(keyValue.getKey(), keyValue.getValue());
keyValueWrittenCount++;
}
Map<String, Object> copy = new LinkedHashMap<>(doc);
if (includeDocCount) {
jsonBuilder.field(DatafeedConfig.DOC_COUNT, docCount);
keyValueWrittenCount++;
copy.put(DatafeedConfig.DOC_COUNT, docCount);
}
jsonBuilder.endObject();
Long timeStamp = (Long) copy.get(timeField);
if (timeStamp == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD, timeField));
}
docsByBucketTimestamp.computeIfAbsent(timeStamp, (t) -> new ArrayList<>()).add(copy);
}
}
@Override
public void close() {
jsonBuilder.close();
/**
* Write the aggregated documents one bucket at a time until {@code batchSize}
* key-value pairs have been written. Buckets are written in their entirety and
* the check on {@code batchSize} run after the bucket has been written so more
* than {@code batchSize} key-value pairs could be written.
* The function should be called repeatedly until it returns false, at that point
* there are no more documents to write.
*
* @param batchSize The number of key-value pairs to write.
* @return True if there are any more documents to write after the call.
* False if there are no documents to write.
* @throws IOException If an error occurs serialising the JSON
*/
boolean writeDocs(int batchSize, OutputStream outputStream) throws IOException {
if (docsByBucketTimestamp.isEmpty()) {
return false;
}
try (XContentBuilder jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream)) {
long previousWrittenCount = keyValueWrittenCount;
Iterator<Map.Entry<Long, List<Map<String, Object>>>> iterator = docsByBucketTimestamp.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, List<Map<String, Object>>> entry = iterator.next();
for (Map<String, Object> map : entry.getValue()) {
writeJsonObject(jsonBuilder, map);
}
iterator.remove();
if (keyValueWrittenCount - previousWrittenCount >= batchSize) {
break;
}
}
}
return docsByBucketTimestamp.isEmpty() == false;
}
private void writeJsonObject(XContentBuilder jsonBuilder, Map<String, Object> record) throws IOException {
jsonBuilder.startObject();
for (Map.Entry<String, Object> keyValue : record.entrySet()) {
jsonBuilder.field(keyValue.getKey(), keyValue.getValue());
keyValueWrittenCount++;
}
jsonBuilder.endObject();
}
/**

View File

@ -23,11 +23,18 @@ public final class Messages {
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
public static final String DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM =
"A top level date_histogram (or histogram) aggregation is required";
"A date_histogram (or histogram) aggregation is required";
public static final String DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM =
"Aggregations can only have 1 date_histogram or histogram aggregation";
public static final String DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM_NO_SIBLINGS =
"The date_histogram (or histogram) aggregation cannot have sibling aggregations";
public static final String DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO =
"Aggregation interval must be greater than 0";
public static final String DATAFEED_AGGREGATIONS_INTERVAL_MUST_LESS_OR_EQUAL_TO_BUCKET_SPAN =
"Aggregation interval [{0}] must be less than or equal to the bucket_span [{1}]";
public static final String DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION =
"Date histogram must have nested max aggregation for time_field [{0}]";
public static final String DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD = "Missing max aggregation for time_field [{0}]";
public static final String INCONSISTENT_ID =
"Inconsistent {0}; ''{1}'' specified in the body differs from ''{2}'' specified as a URL argument";

View File

@ -13,11 +13,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -33,7 +29,6 @@ import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.config.JobTests;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
@ -247,7 +242,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
MlMetadata.Builder builder = new MlMetadata.Builder();
builder.putJob(job1.build(now), false);
expectThrows(IllegalArgumentException.class, () -> builder.putDatafeed(datafeedConfig1));
expectThrows(ElasticsearchStatusException.class, () -> builder.putDatafeed(datafeedConfig1));
}
public void testUpdateDatafeed() {

View File

@ -17,10 +17,14 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
@ -73,7 +77,8 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
long interval = randomNonNegativeLong();
interval = interval > bucketSpanMillis ? bucketSpanMillis : interval;
interval = interval <= 0 ? 1 : interval;
aggs.addAggregator(AggregationBuilders.dateHistogram("time").interval(interval));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
aggs.addAggregator(AggregationBuilders.dateHistogram("buckets").interval(interval).subAggregation(maxTime).field("time"));
builder.setAggregations(aggs);
}
if (randomBoolean()) {
@ -247,8 +252,9 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(300000)));
AggregationBuilders.dateHistogram("time").interval(300000).subAggregation(maxTime).field("time")));
DatafeedConfig datafeedConfig = builder.build();
assertThat(datafeedConfig.hasAggregations(), is(true));
@ -262,25 +268,17 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
assertThat(e.getMessage(), equalTo("A top level date_histogram (or histogram) aggregation is required"));
}
public void testBuild_GivenTopLevelAggIsTerms() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.terms("foo")));
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
assertThat(e.getMessage(), equalTo("A top level date_histogram (or histogram) aggregation is required"));
assertThat(e.getMessage(), equalTo("A date_histogram (or histogram) aggregation is required"));
}
public void testBuild_GivenHistogramWithDefaultInterval() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.histogram("time")));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time").subAggregation(maxTime).field("time"))
);
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
@ -288,8 +286,9 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
}
public void testBuild_GivenDateHistogramWithInvalidTimeZone() {
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time")
.interval(300000L).timeZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("EST")));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("bucket").field("time")
.interval(300000L).timeZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("EST"))).subAggregation(maxTime);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> createDatafeedWithDateHistogram(dateHistogram));
@ -336,17 +335,101 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
public void testChunkingConfig_GivenExplicitSetting() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(createDatafeedWithDateHistogram("30s"));
builder.setChunkingConfig(ChunkingConfig.newAuto());
assertThat(builder.build().getChunkingConfig(), equalTo(ChunkingConfig.newAuto()));
}
public void testCheckHistogramAggregationHasChildMaxTimeAgg() {
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time_agg").field("max_time");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.Builder.checkHistogramAggregationHasChildMaxTimeAgg(dateHistogram));
assertThat(e.getMessage(), containsString("Date histogram must have nested max aggregation for time_field [max_time]"));
}
public void testValidateAggregations_GivenMulitpleHistogramAggs() {
DateHistogramAggregationBuilder nestedDateHistogram = AggregationBuilders.dateHistogram("nested_time");
AvgAggregationBuilder avg = AggregationBuilders.avg("avg").subAggregation(nestedDateHistogram);
TermsAggregationBuilder nestedTerms = AggregationBuilders.terms("nested_terms");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
AggregationBuilder histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
dateHistogram.subAggregation(avg).subAggregation(nestedTerms).subAggregation(maxTime).field("time");
histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms));
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> builder.validateAggregations());
assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage());
}
public void testGetHistogramAggregation_MissingHistogramAgg() {
TermsAggregationBuilder terms = AggregationBuilders.terms("top_level");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(terms).getAggregatorFactories()));
assertEquals("A date_histogram (or histogram) aggregation is required", e.getMessage());
}
public void testGetHistogramAggregation_DateHistogramHasSibling() {
AvgAggregationBuilder avg = AggregationBuilders.avg("avg");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(avg).addAggregator(dateHistogram).getAggregatorFactories()));
assertEquals("The date_histogram (or histogram) aggregation cannot have sibling aggregations", e.getMessage());
TermsAggregationBuilder terms = AggregationBuilders.terms("terms");
terms.subAggregation(dateHistogram);
terms.subAggregation(avg);
e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(terms).getAggregatorFactories()));
assertEquals("The date_histogram (or histogram) aggregation cannot have sibling aggregations", e.getMessage());
}
public void testGetHistogramAggregation() {
AvgAggregationBuilder avg = AggregationBuilders.avg("avg");
TermsAggregationBuilder nestedTerms = AggregationBuilders.terms("nested_terms");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
AggregationBuilder histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
dateHistogram.subAggregation(avg).subAggregation(nestedTerms);
histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(dateHistogram).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);
histogramAggregationBuilder = DatafeedConfig.getHistogramAggregation(
new AggregatorFactories.Builder().addAggregator(toplevelTerms).getAggregatorFactories());
assertEquals(dateHistogram, histogramAggregationBuilder);
}
public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
}
private static DatafeedConfig createDatafeedWithDateHistogram(String interval) {
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("buckets").subAggregation(maxTime).field("time");
if (interval != null) {
dateHistogram.dateHistogramInterval(new DateHistogramInterval(interval));
}
@ -354,7 +437,8 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
}
private static DatafeedConfig createDatafeedWithDateHistogram(Long interval) {
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time");
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("buckets").subAggregation(maxTime).field("time");
if (interval != null) {
dateHistogram.interval(interval);
}

View File

@ -5,10 +5,12 @@
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
@ -33,7 +35,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfig().build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
() -> DatafeedJobValidator.validate(datafeedConfig, job));
assertEquals(errorMessage, e.getMessage());
@ -73,7 +75,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
() -> DatafeedJobValidator.validate(datafeedConfig, job));
assertEquals(errorMessage, e.getMessage());
@ -90,7 +92,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
() -> DatafeedJobValidator.validate(datafeedConfig, job));
assertEquals(errorMessage, e.getMessage());
@ -116,7 +118,7 @@ public class DatafeedJobValidatorTests extends ESTestCase {
Job job = builder.build(new Date());
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs(1800001.0).build();
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class,
() -> DatafeedJobValidator.validate(datafeedConfig, job));
assertEquals("Aggregation interval [1800001ms] must be less than or equal to the bucket_span [1800000ms]", e.getMessage());
@ -139,7 +141,9 @@ public class DatafeedJobValidatorTests extends ESTestCase {
}
private static DatafeedConfig.Builder createValidDatafeedConfigWithAggs(double interval) throws IOException {
HistogramAggregationBuilder histogram = AggregationBuilders.histogram("time").interval(interval);
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
HistogramAggregationBuilder histogram =
AggregationBuilders.histogram("time").interval(interval).field("time").subAggregation(maxTime);
DatafeedConfig.Builder datafeedConfig = createValidDatafeedConfig();
datafeedConfig.setAggregations(new AggregatorFactories.Builder().addAggregator(histogram));
return datafeedConfig;

View File

@ -15,6 +15,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -166,8 +167,9 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
DatafeedConfig datafeed = datafeedBuilder.build();
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
update.setAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000)));
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime)));
DatafeedConfig updatedDatafeed = update.build().apply(datafeed);
@ -175,6 +177,6 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
assertThat(updatedDatafeed.getTypes(), equalTo(Arrays.asList("t_1")));
assertThat(updatedDatafeed.getAggregations(),
equalTo(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000))));
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))));
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
@ -108,8 +109,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
@ -126,8 +128,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class)),
@ -143,8 +146,9 @@ public class DataExtractorFactoryTests extends ESTestCase {
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000)));
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(

View File

@ -16,7 +16,6 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -130,13 +129,14 @@ public class AggregationDataExtractorTests extends ESTestCase {
}
public void testExtractionGivenMultipleBatches() throws IOException {
// Each bucket is 2 key-value pairs, thus 1200 buckets will be 2400
// key-value pairs. They should be processed in 3 batches.
int buckets = 1200;
// Each bucket is 4 key-value pairs and there are 2 terms, thus 600 buckets will be 600 * 4 * 2 = 4800
// key-value pairs. They should be processed in 5 batches.
int buckets = 600;
List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets);
long timestamp = 1000;
for (int i = 0; i < buckets; i++) {
histogramBuckets.add(createHistogramBucket(timestamp, 3, Collections.singletonList(createMax("time", timestamp))));
histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp),
createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0)))));
timestamp += 1000L;
}
@ -146,9 +146,13 @@ public class AggregationDataExtractorTests extends ESTestCase {
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(200L));
assertThat(extractor.hasNext(), is(false));
@ -182,19 +186,6 @@ public class AggregationDataExtractorTests extends ESTestCase {
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenResponseHasInvalidTopLevelAgg() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
Terms termsAgg = mock(Terms.class);
Aggregations aggs = AggregationTestUtils.createAggs(Collections.singletonList(termsAgg));
SearchResponse response = createSearchResponse(aggs);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, extractor::next);
assertThat(e.getMessage(), containsString("Top level aggregation should be [histogram]"));
}
public void testExtractionGivenResponseHasMultipleTopLevelAggs() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
@ -226,7 +217,8 @@ public class AggregationDataExtractorTests extends ESTestCase {
List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets);
long timestamp = 1000;
for (int i = 0; i < buckets; i++) {
histogramBuckets.add(createHistogramBucket(timestamp, 3, Collections.singletonList(createMax("time", timestamp))));
histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp),
createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0)))));
timestamp += 1000L;
}
@ -236,9 +228,9 @@ public class AggregationDataExtractorTests extends ESTestCase {
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L));
assertThat(extractor.hasNext(), is(true));
extractor.cancel();

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@ -29,21 +30,28 @@ public final class AggregationTestUtils {
private AggregationTestUtils() {}
static Histogram.Bucket createHistogramBucket(long timestamp, long docCount, List<Aggregation> subAggregations) {
Histogram.Bucket bucket = createHistogramBucket(timestamp, docCount);
Histogram.Bucket bucket = mock(Histogram.Bucket.class);
when(bucket.getKey()).thenReturn(timestamp);
when(bucket.getDocCount()).thenReturn(docCount);
Aggregations aggs = createAggs(subAggregations);
when(bucket.getAggregations()).thenReturn(aggs);
return bucket;
}
static Aggregations createAggs(List<Aggregation> aggsList) {
return new Aggregations(aggsList) {};
static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) {
return createHistogramBucket(timestamp, docCount, Collections.emptyList());
}
static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) {
Histogram.Bucket bucket = mock(Histogram.Bucket.class);
when(bucket.getKey()).thenReturn(timestamp);
when(bucket.getDocCount()).thenReturn(docCount);
return bucket;
static Aggregations createAggs(List<Aggregation> aggsList) {
return new Aggregations(aggsList);
}
@SuppressWarnings("unchecked")
static Histogram createHistogramAggregation(String name, List histogramBuckets) {
Histogram histogram = mock(Histogram.class);
when((List<Histogram.Bucket>)histogram.getBuckets()).thenReturn(histogramBuckets);
when(histogram.getName()).thenReturn(name);
return histogram;
}
static Max createMax(String name, double value) {
@ -71,12 +79,16 @@ public final class AggregationTestUtils {
when(bucket.getKey()).thenReturn(term.key);
when(bucket.getDocCount()).thenReturn(term.count);
List<Aggregation> numericAggs = new ArrayList<>();
for (Map.Entry<String, Double> keyValue : term.values.entrySet()) {
numericAggs.add(createSingleValue(keyValue.getKey(), keyValue.getValue()));
}
if (!numericAggs.isEmpty()) {
Aggregations aggs = createAggs(numericAggs);
when(bucket.getAggregations()).thenReturn(aggs);
if (term.hasBuckekAggs()) {
when(bucket.getAggregations()).thenReturn(createAggs(term.bucketAggs));
} else {
for (Map.Entry<String, Double> keyValue : term.values.entrySet()) {
numericAggs.add(createSingleValue(keyValue.getKey(), keyValue.getValue()));
}
if (!numericAggs.isEmpty()) {
Aggregations aggs = createAggs(numericAggs);
when(bucket.getAggregations()).thenReturn(aggs);
}
}
buckets.add(bucket);
}
@ -101,6 +113,7 @@ public final class AggregationTestUtils {
String key;
long count;
Map<String, Double> values;
List<Aggregation> bucketAggs;
Term(String key, long count) {
this(key, count, Collections.emptyMap());
@ -116,6 +129,15 @@ public final class AggregationTestUtils {
this.values = values;
}
Term(String key, long count, List<Aggregation> bucketAggs) {
this(key, count);
this.bucketAggs = bucketAggs;
}
private boolean hasBuckekAggs() {
return bucketAggs != null;
}
private static Map<String, Double> newKeyValue(String key, Double value) {
Map<String, Double> keyValue = new HashMap<>();
keyValue.put(key, value);

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.test.ESTestCase;
@ -18,6 +19,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -25,6 +27,7 @@ import java.util.Set;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramAggregation;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles;
@ -39,6 +42,24 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
private long keyValuePairsWritten = 0;
public void testProcessGivenMultipleDateHistograms() {
List<Histogram.Bucket> nestedHistogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Collections.singletonList(createMax("metric1", 1200))),
createHistogramBucket(2000L, 5, Collections.singletonList(createMax("metric1", 2800)))
);
Histogram histogram = createHistogramAggregation("buckets", nestedHistogramBuckets);
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(createMax("time", 1000L), histogram))
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> aggToString("time", Sets.newHashSet("my_field"), histogramBuckets));
assertThat(e.getMessage(), containsString("More than one Date histogram cannot be used in the aggregation. " +
"[buckets] is another instance of a Date histogram"));
}
public void testProcessGivenMaxTimeIsMissing() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3),
@ -52,8 +73,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
public void testProcessGivenNonMaxTimeAgg() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Collections.singletonList(createTerms("time"))),
createHistogramBucket(2000L, 5, Collections.singletonList(createTerms("time")))
createHistogramBucket(1000L, 3, Collections.singletonList(createTerms("time", new Term("a", 1), new Term("b", 2)))),
createHistogramBucket(2000L, 5, Collections.singletonList(createTerms("time", new Term("a", 1), new Term("b", 2))))
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
@ -85,6 +106,42 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertThat(keyValuePairsWritten, equalTo(2L));
}
public void testProcessGivenTopLevelAggIsNotHistogram() throws IOException {
List<Histogram.Bucket> histogramABuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(
createMax("time", 1000), createSingleValue("my_value", 1.0))),
createHistogramBucket(2000L, 4, Arrays.asList(
createMax("time", 2000), createSingleValue("my_value", 2.0))),
createHistogramBucket(3000L, 5, Arrays.asList(
createMax("time", 3000), createSingleValue("my_value", 3.0)))
);
Histogram histogramA = createHistogramAggregation("buckets", histogramABuckets);
List<Histogram.Bucket> histogramBBuckets = Arrays.asList(
createHistogramBucket(1000L, 6, Arrays.asList(
createMax("time", 1000), createSingleValue("my_value", 10.0))),
createHistogramBucket(2000L, 7, Arrays.asList(
createMax("time", 2000), createSingleValue("my_value", 20.0))),
createHistogramBucket(3000L, 8, Arrays.asList(
createMax("time", 3000), createSingleValue("my_value", 30.0)))
);
Histogram histogramB = createHistogramAggregation("buckets", histogramBBuckets);
Terms terms = createTerms("my_field", new Term("A", 20, Collections.singletonList(histogramA)),
new Term("B", 2, Collections.singletonList(histogramB)));
String json = aggToString("time", Sets.newHashSet("my_value", "my_field"), true, createAggs(Collections.singletonList(terms)));
assertThat(json, equalTo("{\"my_field\":\"A\",\"time\":1000,\"my_value\":1.0,\"doc_count\":3} " +
"{\"my_field\":\"B\",\"time\":1000,\"my_value\":10.0,\"doc_count\":6} " +
"{\"my_field\":\"A\",\"time\":2000,\"my_value\":2.0,\"doc_count\":4} " +
"{\"my_field\":\"B\",\"time\":2000,\"my_value\":20.0,\"doc_count\":7} " +
"{\"my_field\":\"A\",\"time\":3000,\"my_value\":3.0,\"doc_count\":5} " +
"{\"my_field\":\"B\",\"time\":3000,\"my_value\":30.0,\"doc_count\":8}"
));
}
public void testProcessGivenSingleMetricPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(
@ -97,7 +154,9 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
String json = aggToString("time", Sets.newHashSet("my_value"), histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":3000,\"my_value\":3.0,\"doc_count\":5}"));
assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} " +
"{\"time\":2000,\"doc_count\":3} " +
"{\"time\":3000,\"my_value\":3.0,\"doc_count\":5}"));
}
public void testProcessGivenTermsPerHistogram() throws IOException {
@ -225,29 +284,23 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
}
public void testProcessGivenMixedBucketAndLeafAggregationsAtSameLevel_BucketFirst() throws IOException {
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
Terms terms = mock(Terms.class);
when(terms.getName()).thenReturn("terms");
Terms terms = createTerms("terms", new Term("a", 1), new Term("b", 2));
Max maxAgg = createMax("max_value", 1200);
Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), terms, maxAgg));
when(histogramBucket.getAggregations()).thenReturn(subAggs);
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2, Arrays.asList(terms, createMax("time", 1000), maxAgg));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket));
assertThat(e.getMessage(), containsString("Mixing bucket and leaf aggregations at the same level is not supported"));
String json = aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket);
assertThat(json, equalTo("{\"time\":1000,\"max_value\":1200.0,\"terms\":\"a\",\"doc_count\":1} " +
"{\"time\":1000,\"max_value\":1200.0,\"terms\":\"b\",\"doc_count\":2}"));
}
public void testProcessGivenMixedBucketAndLeafAggregationsAtSameLevel_LeafFirst() throws IOException {
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
Max maxAgg = createMax("max_value", 1200);
Terms terms = mock(Terms.class);
when(terms.getName()).thenReturn("terms");
Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), maxAgg, terms));
when(histogramBucket.getAggregations()).thenReturn(subAggs);
Terms terms = createTerms("terms", new Term("a", 1), new Term("b", 2));
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2, Arrays.asList(createMax("time", 1000), maxAgg, terms));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket));
assertThat(e.getMessage(), containsString("Mixing bucket and leaf aggregations at the same level is not supported"));
String json = aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket);
assertThat(json, equalTo("{\"time\":1000,\"max_value\":1200.0,\"terms\":\"a\",\"doc_count\":1} " +
"{\"time\":1000,\"max_value\":1200.0,\"terms\":\"b\",\"doc_count\":2}"));
}
public void testProcessGivenBucketAndLeafAggregationsButBucketNotInFields() throws IOException {
@ -290,6 +343,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " +
"{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " +
"{\"time\":3000,\"doc_count\":10} " +
"{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
}
@ -310,6 +364,36 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertThat(e.getMessage(), containsString("Multi-percentile aggregation [my_field] is not supported"));
}
public void testBucketAggContainsRequiredAgg() throws IOException {
Set<String> fields = new HashSet<>();
fields.add("foo");
AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false);
Terms termsAgg = mock(Terms.class);
when(termsAgg.getBuckets()).thenReturn(Collections.emptyList());
when(termsAgg.getName()).thenReturn("foo");
assertTrue(processor.bucketAggContainsRequiredAgg(termsAgg));
when(termsAgg.getName()).thenReturn("bar");
assertFalse(processor.bucketAggContainsRequiredAgg(termsAgg));
Terms nestedTermsAgg = mock(Terms.class);
when(nestedTermsAgg.getBuckets()).thenReturn(Collections.emptyList());
when(nestedTermsAgg.getName()).thenReturn("nested_terms");
StringTerms.Bucket bucket = mock(StringTerms.Bucket.class);
when(bucket.getAggregations()).thenReturn(new Aggregations(Collections.singletonList(nestedTermsAgg)));
when((List<Terms.Bucket>) termsAgg.getBuckets()).thenReturn(Collections.singletonList(bucket));
assertFalse(processor.bucketAggContainsRequiredAgg(termsAgg));
Max max = mock(Max.class);
when(max.getName()).thenReturn("foo");
StringTerms.Bucket nestedTermsBucket = mock(StringTerms.Bucket.class);
when(nestedTermsBucket.getAggregations()).thenReturn(new Aggregations(Collections.singletonList(max)));
when((List<Terms.Bucket>) nestedTermsAgg.getBuckets()).thenReturn(Collections.singletonList(nestedTermsBucket));
assertTrue(processor.bucketAggContainsRequiredAgg(termsAgg));
}
private String aggToString(String timeField, Set<String> fields, Histogram.Bucket bucket) throws IOException {
return aggToString(timeField, fields, true, Collections.singletonList(bucket));
}
@ -320,13 +404,20 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
private String aggToString(String timeField, Set<String> fields, boolean includeDocCount, List<Histogram.Bucket> buckets)
throws IOException {
Histogram histogram = createHistogramAggregation("buckets", buckets);
return aggToString(timeField, fields, includeDocCount, createAggs(Collections.singletonList(histogram)));
}
private String aggToString(String timeField, Set<String> fields, boolean includeDocCount, Aggregations aggregations)
throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, outputStream)) {
for (Histogram.Bucket bucket : buckets) {
processor.process(bucket);
}
keyValuePairsWritten = processor.getKeyValueCount();
}
AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount);
processor.process(aggregations);
processor.writeDocs(10000, outputStream);
keyValuePairsWritten = processor.getKeyValueCount();
return outputStream.toString(StandardCharsets.UTF_8.name());
}
}

View File

@ -16,6 +16,8 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -90,7 +92,12 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
assertTrue(putJobResponse.isAcknowledged());
DatafeedConfig.Builder configBuilder = createDatafeedBuilder("data_feed_id", job.getId(), Collections.singletonList("*"));
configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.histogram("time").interval(300000)));
MaxAggregationBuilder maxAggregation = AggregationBuilders.max("time").field("time");
HistogramAggregationBuilder histogramAggregation = AggregationBuilders.histogram("time").interval(300000)
.subAggregation(maxAggregation).field("time");
configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation));
configBuilder.setFrequency(TimeValue.timeValueMinutes(2));
DatafeedConfig config = configBuilder.build();
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.Locale;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
@ -47,8 +49,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
return true;
}
@Before
public void setUpData() throws Exception {
private void setupUser() throws IOException {
String password = new String(SecuritySettingsSource.TEST_PASSWORD_SECURE_STRING.getChars());
// This user has admin rights on machine learning, but (importantly for the tests) no
@ -60,7 +61,16 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
client().performRequest("put", "_xpack/security/user/ml_admin", Collections.emptyMap(),
new StringEntity(user, ContentType.APPLICATION_JSON));
}
@Before
public void setUpData() throws Exception {
setupUser();
addAirlineData();
addNetworkData();
}
private void addAirlineData() throws IOException {
String mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
@ -210,6 +220,49 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
client().performRequest("post", "_refresh");
}
private void addNetworkData() throws IOException {
// Create index with source = enabled, doc_values = enabled, stored = false + multi-field
String mappings = "{"
+ " \"mappings\": {"
+ " \"doc\": {"
+ " \"properties\": {"
+ " \"timestamp\": { \"type\":\"date\"},"
+ " \"host\": {"
+ " \"type\":\"text\","
+ " \"fields\":{"
+ " \"text\":{\"type\":\"text\"},"
+ " \"keyword\":{\"type\":\"keyword\"}"
+ " }"
+ " },"
+ " \"network_bytes_out\": { \"type\":\"long\"}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "network-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON));
String docTemplate = "{\"timestamp\":%d,\"host\":\"%s\",\"network_bytes_out\":%d}";
Date date = new Date(1464739200000L);
for (int i=0; i<120; i++) {
long byteCount = randomNonNegativeLong();
String jsonDoc = String.format(Locale.ROOT, docTemplate, date.getTime(), "hostA", byteCount);
client().performRequest("post", "network-data/doc", Collections.emptyMap(),
new StringEntity(jsonDoc, ContentType.APPLICATION_JSON));
byteCount = randomNonNegativeLong();
jsonDoc = String.format(Locale.ROOT, docTemplate, date.getTime(), "hostB", byteCount);
client().performRequest("post", "network-data/doc", Collections.emptyMap(),
new StringEntity(jsonDoc, ContentType.APPLICATION_JSON));
date = new Date(date.getTime() + 10_000);
}
// Ensure all data is searchable
client().performRequest("post", "_refresh");
}
public void testLookbackOnlyWithMixedTypes() throws Exception {
new LookbackOnlyTestHelper("test-lookback-only-with-mixed-types", "airline-data")
.setShouldSucceedProcessing(true).execute();
@ -372,6 +425,72 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testLookbackUsingDerivativeAgg() throws Exception {
String jobId = "derivative-agg-network-job";
String job = "{\"analysis_config\" :{\"bucket_span\":\"300s\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"bytes-delta\",\"by_field_name\":\"hostname\"}]},"
+ "\"data_description\" : {\"time_field\":\"timestamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
String datafeedId = "datafeed-" + jobId;
String aggregations =
"{\"hostname\": {\"terms\" : {\"field\": \"host.keyword\", \"size\":10},"
+ "\"aggs\": {\"buckets\": {\"date_histogram\":{\"field\":\"timestamp\",\"interval\":\"60s\"},"
+ "\"aggs\": {\"timestamp\":{\"max\":{\"field\":\"timestamp\"}},"
+ "\"bytes-delta\":{\"derivative\":{\"buckets_path\":\"avg_bytes_out\"}},"
+ "\"avg_bytes_out\":{\"avg\":{\"field\":\"network_bytes_out\"}} }}}}}";
new DatafeedBuilder(datafeedId, jobId, "network-data", "doc").setAggregations(aggregations).build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":40"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":40"));
assertThat(jobStatsResponseAsString, containsString("\"out_of_order_timestamp_count\":0"));
assertThat(jobStatsResponseAsString, containsString("\"bucket_count\":4"));
// The derivative agg won't have values for the first bucket of each host
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":2"));
}
public void testLookbackWithPipelineBucketAgg() throws Exception {
String jobId = "pipeline-bucket-agg-job";
String job = "{\"analysis_config\" :{\"bucket_span\":\"1h\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"percentile95_airlines_count\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"15m\"},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"airlines\":{\"terms\":{\"field\":\"airline.keyword\",\"size\":10}},"
+ "\"percentile95_airlines_count\":{\"percentiles_bucket\":" +
"{\"buckets_path\":\"airlines._count\", \"percents\": [95]}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").setAggregations(aggregations).build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"input_field_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_field_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"out_of_order_timestamp_count\":0"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId, "airline");