Merge pull request #11196 from polyfractal/feature/aggs_2_0_diff

Aggregations: add serial differencing pipeline aggregation
This commit is contained in:
Zachary Tong 2015-07-10 18:26:19 -04:00
commit bb9c160855
11 changed files with 750 additions and 0 deletions

View File

@ -67,6 +67,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParse
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelModule;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffParser;
import java.util.List;
@ -120,6 +121,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
pipelineAggParsers.add(CumulativeSumParser.class);
pipelineAggParsers.add(BucketScriptParser.class);
pipelineAggParsers.add(BucketSelectorParser.class);
pipelineAggParsers.add(SerialDiffParser.class);
}
/**

View File

@ -72,6 +72,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivat
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.TransportMovAvgModelModule;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
/**
* A module that registers all the transport streams for the addAggregation
@ -133,6 +134,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams();
BucketSelectorPipelineAggregator.registerStreams();
SerialDiffPipelineAggregator.registerStreams();
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptB
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffBuilder;
public final class PipelineAggregatorBuilders {
@ -69,4 +70,8 @@ public final class PipelineAggregatorBuilders {
public static final CumulativeSumBuilder cumulativeSum(String name) {
return new CumulativeSumBuilder(name);
}
public static final SerialDiffBuilder diff(String name) {
return new SerialDiffBuilder(name);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.serialdiff;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;
import java.io.IOException;
public class SerialDiffBuilder extends PipelineAggregatorBuilder<SerialDiffBuilder> {
private String format;
private GapPolicy gapPolicy;
private Integer lag;
public SerialDiffBuilder(String name) {
super(name, SerialDiffPipelineAggregator.TYPE.name());
}
public SerialDiffBuilder format(String format) {
this.format = format;
return this;
}
public SerialDiffBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}
public SerialDiffBuilder lag(Integer lag) {
this.lag = lag;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(SerialDiffParser.FORMAT.getPreferredName(), format);
}
if (gapPolicy != null) {
builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
if (lag != null) {
builder.field(SerialDiffParser.LAG.getPreferredName(), lag);
}
return builder;
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.serialdiff;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
public class SerialDiffParser implements PipelineAggregator.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField LAG = new ParseField("lag");
@Override
public String type() {
return SerialDiffPipelineAggregator.TYPE.name();
}
@Override
public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
GapPolicy gapPolicy = GapPolicy.SKIP;
int lag = 1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (context.parseFieldMatcher().match(currentFieldName, FORMAT)) {
format = parser.text();
} else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
bucketsPaths = new String[] { parser.text() };
} else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (context.parseFieldMatcher().match(currentFieldName, LAG)) {
lag = parser.intValue(true);
if (lag <= 0) {
throw new SearchParseException(context, "Lag must be a positive, non-zero integer. Value supplied was" +
lag + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
parser.getTokenLocation());
}
}
if (bucketsPaths == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
}
ValueFormatter formatter;
if (format != null) {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
} else {
formatter = ValueFormatter.RAW;
}
return new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths, formatter, gapPolicy, lag);
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.serialdiff;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.pipeline.*;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
public class SerialDiffPipelineAggregator extends PipelineAggregator {
public final static Type TYPE = new Type("serial_diff");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public SerialDiffPipelineAggregator readResult(StreamInput in) throws IOException {
SerialDiffPipelineAggregator result = new SerialDiffPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
}
private ValueFormatter formatter;
private GapPolicy gapPolicy;
private int lag;
public SerialDiffPipelineAggregator() {
}
public SerialDiffPipelineAggregator(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy,
int lag, Map<String, Object> metadata) {
super(name, bucketsPaths, metadata);
this.formatter = formatter;
this.gapPolicy = gapPolicy;
this.lag = lag;
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalHistogram histo = (InternalHistogram) aggregation;
List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();
List newBuckets = new ArrayList<>();
EvictingQueue<Double> lagWindow = EvictingQueue.create(lag);
int counter = 0;
for (InternalHistogram.Bucket bucket : buckets) {
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
InternalHistogram.Bucket newBucket = bucket;
counter += 1;
// Still under the initial lag period, add nothing and move on
Double lagValue;
if (counter <= lag) {
lagValue = Double.NaN;
} else {
lagValue = lagWindow.peek(); // Peek here, because we rely on add'ing to always move the window
}
// Normalize null's to NaN
if (thisBucketValue == null) {
thisBucketValue = Double.NaN;
}
// Both have values, calculate diff and replace the "empty" bucket
if (!Double.isNaN(thisBucketValue) && !Double.isNaN(lagValue)) {
double diff = thisBucketValue - lagValue;
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<PipelineAggregator>(), metaData()));
newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
aggs), bucket.getKeyed(), bucket.getFormatter());
}
newBuckets.add(newBucket);
lagWindow.add(thisBucketValue);
}
return factory.create(newBuckets, histo);
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
gapPolicy = GapPolicy.readFrom(in);
lag = in.readVInt();
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
gapPolicy.writeTo(out);
out.writeVInt(lag);
}
public static class Factory extends PipelineAggregatorFactory {
private final ValueFormatter formatter;
private GapPolicy gapPolicy;
private int lag;
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, int lag) {
super(name, TYPE.name(), bucketsPaths);
this.formatter = formatter;
this.gapPolicy = gapPolicy;
this.lag = lag;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, lag, metaData);
}
}
}

View File

@ -0,0 +1,291 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.serialdiff;
import com.google.common.collect.EvictingQueue;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.metrics.ValuesSourceMetricsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregationHelperTests;
import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.diff;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue;
@ElasticsearchIntegrationTest.SuiteScopeTest
public class SerialDiffTests extends ElasticsearchIntegrationTest {
private static final String INTERVAL_FIELD = "l_value";
private static final String VALUE_FIELD = "v_value";
static int interval;
static int numBuckets;
static int lag;
static BucketHelpers.GapPolicy gapPolicy;
static ValuesSourceMetricsAggregationBuilder metric;
static List<PipelineAggregationHelperTests.MockBucket> mockHisto;
static Map<String, ArrayList<Double>> testValues;
enum MetricTarget {
VALUE ("value"), COUNT("count");
private final String name;
MetricTarget(String s) {
name = s;
}
public String toString(){
return name;
}
}
private ValuesSourceMetricsAggregationBuilder randomMetric(String name, String field) {
int rand = randomIntBetween(0,3);
switch (rand) {
case 0:
return min(name).field(field);
case 2:
return max(name).field(field);
case 3:
return avg(name).field(field);
default:
return avg(name).field(field);
}
}
private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) {
if (!expectedBucketIter.hasNext()) {
fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch");
}
if (!expectedCountsIter.hasNext()) {
fail("`expectedCountsIter` iterator ended before `actual` iterator, size mismatch");
}
if (!expectedValuesIter.hasNext()) {
fail("`expectedValuesIter` iterator ended before `actual` iterator, size mismatch");
}
}
private void assertBucketContents(Histogram.Bucket actual, Double expectedCount, Double expectedValue) {
// This is a gap bucket
SimpleValue countDiff = actual.getAggregations().get("diff_counts");
if (expectedCount == null) {
assertThat("[_count] diff is not null", countDiff, nullValue());
} else {
assertThat("[_count] diff is null", countDiff, notNullValue());
assertThat("[_count] diff does not match expected [" + countDiff.value() + " vs " + expectedCount + "]",
countDiff.value(), closeTo(expectedCount, 0.1));
}
// This is a gap bucket
SimpleValue valuesDiff = actual.getAggregations().get("diff_values");
if (expectedValue == null) {
assertThat("[value] diff is not null", valuesDiff, Matchers.nullValue());
} else {
assertThat("[value] diff is null", valuesDiff, notNullValue());
assertThat("[value] diff does not match expected [" + valuesDiff.value() + " vs " + expectedValue + "]",
valuesDiff.value(), closeTo(expectedValue, 0.1));
}
}
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
List<IndexRequestBuilder> builders = new ArrayList<>();
interval = 5;
numBuckets = randomIntBetween(10, 80);
lag = randomIntBetween(1, numBuckets / 2);
gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS;
metric = randomMetric("the_metric", VALUE_FIELD);
mockHisto = PipelineAggregationHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble());
testValues = new HashMap<>(8);
for (MetricTarget target : MetricTarget.values()) {
setupExpected(target);
}
for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) {
for (double value : mockBucket.docValues) {
builders.add(client().prepareIndex("idx", "type").setSource(jsonBuilder().startObject()
.field(INTERVAL_FIELD, mockBucket.key)
.field(VALUE_FIELD, value).endObject()));
}
}
indexRandom(true, builders);
ensureSearchable();
}
/**
* @param target The document field "target", e.g. _count or a field value
*/
private void setupExpected(MetricTarget target) {
ArrayList<Double> values = new ArrayList<>(numBuckets);
EvictingQueue<Double> lagWindow = EvictingQueue.create(lag);
int counter = 0;
for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) {
Double metricValue;
double[] docValues = mockBucket.docValues;
// Gaps only apply to metric values, not doc _counts
if (mockBucket.count == 0 && target.equals(MetricTarget.VALUE)) {
// If there was a gap in doc counts and we are ignoring, just skip this bucket
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
metricValue = null;
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
// otherwise insert a zero instead of the true value
metricValue = 0.0;
} else {
metricValue = PipelineAggregationHelperTests.calculateMetric(docValues, metric);
}
} else {
// If this isn't a gap, or is a _count, just insert the value
metricValue = target.equals(MetricTarget.VALUE) ? PipelineAggregationHelperTests.calculateMetric(docValues, metric) : mockBucket.count;
}
counter += 1;
// Still under the initial lag period, add nothing and move on
Double lagValue;
if (counter <= lag) {
lagValue = Double.NaN;
} else {
lagValue = lagWindow.peek(); // Peek here, because we rely on add'ing to always move the window
}
// Normalize null's to NaN
if (metricValue == null) {
metricValue = Double.NaN;
}
// Both have values, calculate diff and replace the "empty" bucket
if (!Double.isNaN(metricValue) && !Double.isNaN(lagValue)) {
double diff = metricValue - lagValue;
values.add(diff);
} else {
values.add(null); // The tests need null, even though the agg doesn't
}
lagWindow.add(metricValue);
}
testValues.put(target.toString(), values);
}
@Test
public void basicDiff() {
SearchResponse response = client()
.prepareSearch("idx").setTypes("type")
.addAggregation(
histogram("histo").field(INTERVAL_FIELD).interval(interval)
.extendedBounds(0L, (long) (interval * (numBuckets - 1)))
.subAggregation(metric)
.subAggregation(diff("diff_counts")
.lag(lag)
.gapPolicy(gapPolicy)
.setBucketsPaths("_count"))
.subAggregation(diff("diff_values")
.lag(lag)
.gapPolicy(gapPolicy)
.setBucketsPaths("the_metric"))
).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<InternalHistogram.Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MetricTarget.COUNT.toString());
List<Double> expectedValues = testValues.get(MetricTarget.VALUE.toString());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
Iterator<Double> expectedCountsIter = expectedCounts.iterator();
Iterator<Double> expectedValuesIter = expectedValues.iterator();
while (actualIter.hasNext()) {
assertValidIterators(expectedBucketIter, expectedCountsIter, expectedValuesIter);
Histogram.Bucket actual = actualIter.next();
PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next();
Double expectedCount = expectedCountsIter.next();
Double expectedValue = expectedValuesIter.next();
assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key));
assertThat("doc counts do not match", actual.getDocCount(), equalTo((long)expected.count));
assertBucketContents(actual, expectedCount, expectedValue);
}
}
@Test
public void invalidLagSize() {
try {
client()
.prepareSearch("idx").setTypes("type")
.addAggregation(
histogram("histo").field(INTERVAL_FIELD).interval(interval)
.extendedBounds(0L, (long) (interval * (numBuckets - 1)))
.subAggregation(metric)
.subAggregation(diff("diff_counts")
.lag(-1)
.gapPolicy(gapPolicy)
.setBucketsPaths("_count"))
).execute().actionGet();
} catch (SearchPhaseExecutionException e) {
// All good
}
}
}

View File

@ -167,3 +167,4 @@ include::pipeline/movavg-aggregation.asciidoc[]
include::pipeline/cumulative-sum-aggregation.asciidoc[]
include::pipeline/bucket-script-aggregation.asciidoc[]
include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/series-arithmetic-aggregation.asciidoc[]

View File

@ -0,0 +1,105 @@
[[search-aggregations-pipeline-serialdiff-aggregation]]
=== Serial Differencing Aggregation
coming[2.0.0]
experimental[]
Serial differencing is a technique where values in a time series are subtracted from itself at
different time lags or periods. For example, the datapoint f(x) = f(x~t~) - f(x~t-n~), where n is the period being used.
A period of 1 is equivalent to a derivative with no time normalization: it is simply the change from one point to the
next. Single periods are useful for removing constant, linear trends.
Single periods are also useful for transforming data into a stationary series. In this example, the Dow Jones is
plotted over ~250 days. The raw data is not stationary, which would make it difficult to use with some techniques.
By calculating the first-difference, we de-trend the data (e.g. remove a constant, linear trend). We can see that the
data becomes a stationary series (e.g. the first difference is randomly distributed around zero, and doesn't seem to
exhibit any pattern/behavior). The transformation reveals that the dataset is following a random-walk; the value is the
previous value +/- a random amount. This insight allows selection of further tools for analysis.
[[serialdiff_dow]]
.Dow Jones plotted and made stationary with first-differencing
image::images/pipeline_serialdiff/dow.png[]
Larger periods can be used to remove seasonal / cyclic behavior. In this example, a population of lemmings was
synthetically generated with a sine wave + constant linear trend + random noise. The sine wave has a period of 30 days.
The first-difference removes the constant trend, leaving just a sine wave. The 30th-difference is then applied to the
first-difference to remove the cyclic behavior, leaving a stationary series which is amenable to other analysis.
[[serialdiff_lemmings]]
.Lemmings data plotted made stationary with 1st and 30th difference
image::images/pipeline_serialdiff/lemmings.png[]
==== Syntax
A `serial_diff` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"serial_diff": {
"buckets_path": "the_sum",
"lag": "7"
}
}
--------------------------------------------------
.`moving_avg` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |Path to the metric of interest (see <<bucket-path-syntax, `buckets_path` Syntax>> for more details |Required |
|`lag` |The historical bucket to subtract from the current value. E.g. a lag of 7 will subtract the current value from
the value 7 buckets ago. Must be a positive, non-zero integer |Optional |`1`
|`gap_policy` |Determines what should happen when a gap in the data is encountered. |Optional |`insert_zero`
|`format` |Format to apply to the output value of this aggregation |Optional | `null`
|===
`serial_diff` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation:
[source,js]
--------------------------------------------------
{
"aggs": {
"my_date_histo": { <1>
"date_histogram": {
"field": "timestamp",
"interval": "day"
},
"aggs": {
"the_sum": {
"sum": {
"field": "lemmings" <2>
}
},
"thirtieth_difference": {
"serial_diff": { <3>
"buckets_path": "lemmings",
"lag" : 30
}
}
}
}
}
}
--------------------------------------------------
<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals
<2> A `sum` metric is used to calculate the sum of a field. This could be any metric (sum, min, max, etc)
<3> Finally, we specify a `serial_diff` aggregation which uses "the_sum" metric as its input.
Serial differences are built by first specifying a `histogram` or `date_histogram` over a field. You can then optionally
add normal metrics, such as a `sum`, inside of that histogram. Finally, the `serial_diff` is embedded inside the histogram.
The `buckets_path` parameter is then used to "point" at one of the sibling metrics inside of the histogram (see
<<bucket-path-syntax>> for a description of the syntax for `buckets_path`.

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB