review comment fixes
This commit is contained in:
parent
26189ee2e6
commit
31f26ec115
|
@ -160,14 +160,6 @@ public class AggregatorFactories {
|
||||||
return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), orderedReducers);
|
return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), orderedReducers);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* L ← Empty list that will contain the sorted nodes while there are
|
|
||||||
* unmarked nodes do select an unmarked node n visit(n) function
|
|
||||||
* visit(node n) if n has a temporary mark then stop (not a DAG) if n is
|
|
||||||
* not marked (i.e. has not been visited yet) then mark n temporarily
|
|
||||||
* for each node m with an edge from n to m do visit(m) mark n
|
|
||||||
* permanently unmark n temporarily add n to head of L
|
|
||||||
*/
|
|
||||||
private List<ReducerFactory> resolveReducerOrder(List<ReducerFactory> reducerFactories, List<AggregatorFactory> aggFactories) {
|
private List<ReducerFactory> resolveReducerOrder(List<ReducerFactory> reducerFactories, List<AggregatorFactory> aggFactories) {
|
||||||
Map<String, ReducerFactory> reducerFactoriesMap = new HashMap<>();
|
Map<String, ReducerFactory> reducerFactoriesMap = new HashMap<>();
|
||||||
for (ReducerFactory factory : reducerFactories) {
|
for (ReducerFactory factory : reducerFactories) {
|
||||||
|
@ -184,10 +176,6 @@ public class AggregatorFactories {
|
||||||
ReducerFactory factory = unmarkedFactories.get(0);
|
ReducerFactory factory = unmarkedFactories.get(0);
|
||||||
resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories, temporarilyMarked, factory);
|
resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories, temporarilyMarked, factory);
|
||||||
}
|
}
|
||||||
List<String> orderedReducerNames = new ArrayList<>();
|
|
||||||
for (ReducerFactory reducerFactory : orderedReducers) {
|
|
||||||
orderedReducerNames.add(reducerFactory.getName());
|
|
||||||
}
|
|
||||||
return orderedReducers;
|
return orderedReducers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,13 +98,4 @@ public abstract class InternalMultiBucketAggregation<A extends InternalMultiBuck
|
||||||
return aggregation.getProperty(path.subList(1, path.size()));
|
return aggregation.getProperty(path.subList(1, path.size()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static abstract class Factory<A extends InternalMultiBucketAggregation, B extends InternalMultiBucketAggregation.InternalBucket> {
|
|
||||||
|
|
||||||
public abstract String type();
|
|
||||||
|
|
||||||
public abstract A create(List<B> buckets, A prototype);
|
|
||||||
|
|
||||||
public abstract B createBucket(InternalAggregations aggregations, B prototype);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,8 +43,7 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
private IntArray docCounts;
|
private IntArray docCounts;
|
||||||
|
|
||||||
public BucketsAggregator(String name, AggregatorFactories factories,
|
public BucketsAggregator(String name, AggregatorFactories factories, AggregationContext context, Aggregator parent,
|
||||||
AggregationContext context, Aggregator parent,
|
|
||||||
List<Reducer> reducers, Map<String, Object> metaData) throws IOException {
|
List<Reducer> reducers, Map<String, Object> metaData) throws IOException {
|
||||||
super(name, factories, context, parent, reducers, metaData);
|
super(name, factories, context, parent, reducers, metaData);
|
||||||
bigArrays = context.bigArrays();
|
bigArrays = context.bigArrays();
|
||||||
|
|
|
@ -234,7 +234,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Factory<B extends InternalHistogram.Bucket> extends InternalMultiBucketAggregation.Factory<InternalHistogram<B>, B> {
|
public static class Factory<B extends InternalHistogram.Bucket> {
|
||||||
|
|
||||||
protected Factory() {
|
protected Factory() {
|
||||||
}
|
}
|
||||||
|
@ -249,13 +249,11 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
||||||
return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, this, reducers, metaData);
|
return new InternalHistogram<>(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed, this, reducers, metaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public InternalHistogram<B> create(List<B> buckets, InternalHistogram<B> prototype) {
|
public InternalHistogram<B> create(List<B> buckets, InternalHistogram<B> prototype) {
|
||||||
return new InternalHistogram<>(prototype.name, buckets, prototype.order, prototype.minDocCount, prototype.emptyBucketInfo,
|
return new InternalHistogram<>(prototype.name, buckets, prototype.order, prototype.minDocCount, prototype.emptyBucketInfo,
|
||||||
prototype.formatter, prototype.keyed, this, prototype.reducers(), prototype.metaData);
|
prototype.formatter, prototype.keyed, this, prototype.reducers(), prototype.metaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public B createBucket(InternalAggregations aggregations, B prototype) {
|
public B createBucket(InternalAggregations aggregations, B prototype) {
|
||||||
return (B) new Bucket(prototype.key, prototype.docCount, prototype.getKeyed(), prototype.formatter, this, aggregations);
|
return (B) new Bucket(prototype.key, prototype.docCount, prototype.getKeyed(), prototype.formatter, this, aggregations);
|
||||||
}
|
}
|
||||||
|
|
|
@ -225,7 +225,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Factory<B extends Bucket, R extends InternalRange<B, R>> extends InternalMultiBucketAggregation.Factory<R, B> {
|
public static class Factory<B extends Bucket, R extends InternalRange<B, R>> {
|
||||||
|
|
||||||
public String type() {
|
public String type() {
|
||||||
return TYPE.name();
|
return TYPE.name();
|
||||||
|
@ -236,18 +236,16 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
|
||||||
return (R) new InternalRange<>(name, ranges, formatter, keyed, reducers, metaData);
|
return (R) new InternalRange<>(name, ranges, formatter, keyed, reducers, metaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed,
|
||||||
public B createBucket(String key, double from, double to, long docCount, InternalAggregations aggregations, boolean keyed, @Nullable ValueFormatter formatter) {
|
@Nullable ValueFormatter formatter) {
|
||||||
return (B) new Bucket(key, from, to, docCount, aggregations, keyed, formatter);
|
return (B) new Bucket(key, from, to, docCount, aggregations, keyed, formatter);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public R create(List<B> ranges, R prototype) {
|
public R create(List<B> ranges, R prototype) {
|
||||||
return (R) new InternalRange<>(prototype.name, ranges, prototype.formatter, prototype.keyed, prototype.reducers(),
|
return (R) new InternalRange<>(prototype.name, ranges, prototype.formatter, prototype.keyed, prototype.reducers(),
|
||||||
prototype.metaData);
|
prototype.metaData);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public B createBucket(InternalAggregations aggregations, B prototype) {
|
public B createBucket(InternalAggregations aggregations, B prototype) {
|
||||||
return (B) new Bucket(prototype.getKey(), prototype.from, prototype.to, prototype.getDocCount(), aggregations, prototype.keyed,
|
return (B) new Bucket(prototype.getKey(), prototype.from, prototype.to, prototype.getDocCount(), aggregations, prototype.keyed,
|
||||||
prototype.formatter);
|
prototype.formatter);
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class BucketHelpers {
|
||||||
* "ignore": empty buckets will simply be ignored
|
* "ignore": empty buckets will simply be ignored
|
||||||
*/
|
*/
|
||||||
public static enum GapPolicy {
|
public static enum GapPolicy {
|
||||||
INSERT_ZEROS((byte) 0, "insert_zeros"), IGNORE((byte) 1, "ignore");
|
INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse a string GapPolicy into the byte enum
|
* Parse a string GapPolicy into the byte enum
|
||||||
|
@ -172,7 +172,7 @@ public class BucketHelpers {
|
||||||
switch (gapPolicy) {
|
switch (gapPolicy) {
|
||||||
case INSERT_ZEROS:
|
case INSERT_ZEROS:
|
||||||
return 0.0;
|
return 0.0;
|
||||||
case IGNORE:
|
case SKIP:
|
||||||
default:
|
default:
|
||||||
return Double.NaN;
|
return Double.NaN;
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ public class MaxBucketParser implements Reducer.Parser {
|
||||||
String currentFieldName = null;
|
String currentFieldName = null;
|
||||||
String[] bucketsPaths = null;
|
String[] bucketsPaths = null;
|
||||||
String format = null;
|
String format = null;
|
||||||
GapPolicy gapPolicy = GapPolicy.IGNORE;
|
GapPolicy gapPolicy = GapPolicy.SKIP;
|
||||||
|
|
||||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||||
if (token == XContentParser.Token.FIELD_NAME) {
|
if (token == XContentParser.Token.FIELD_NAME) {
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class DerivativeParser implements Reducer.Parser {
|
||||||
String currentFieldName = null;
|
String currentFieldName = null;
|
||||||
String[] bucketsPaths = null;
|
String[] bucketsPaths = null;
|
||||||
String format = null;
|
String format = null;
|
||||||
GapPolicy gapPolicy = GapPolicy.IGNORE;
|
GapPolicy gapPolicy = GapPolicy.SKIP;
|
||||||
|
|
||||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||||
if (token == XContentParser.Token.FIELD_NAME) {
|
if (token == XContentParser.Token.FIELD_NAME) {
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class MovAvgParser implements Reducer.Parser {
|
||||||
String[] bucketsPaths = null;
|
String[] bucketsPaths = null;
|
||||||
String format = null;
|
String format = null;
|
||||||
|
|
||||||
GapPolicy gapPolicy = GapPolicy.IGNORE;
|
GapPolicy gapPolicy = GapPolicy.SKIP;
|
||||||
int window = 5;
|
int window = 5;
|
||||||
Map<String, Object> settings = null;
|
Map<String, Object> settings = null;
|
||||||
String model = "simple";
|
String model = "simple";
|
||||||
|
|
|
@ -51,7 +51,6 @@ import static org.hamcrest.core.IsNull.notNullValue;
|
||||||
import static org.hamcrest.core.IsNull.nullValue;
|
import static org.hamcrest.core.IsNull.nullValue;
|
||||||
|
|
||||||
@ElasticsearchIntegrationTest.SuiteScopeTest
|
@ElasticsearchIntegrationTest.SuiteScopeTest
|
||||||
//@AwaitsFix(bugUrl = "Fix factory selection for serialisation of Internal derivative")
|
|
||||||
public class DateDerivativeTests extends ElasticsearchIntegrationTest {
|
public class DateDerivativeTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
private DateTime date(int month, int day) {
|
private DateTime date(int month, int day) {
|
||||||
|
|
|
@ -121,7 +121,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
||||||
alpha = randomDouble();
|
alpha = randomDouble();
|
||||||
beta = randomDouble();
|
beta = randomDouble();
|
||||||
|
|
||||||
gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.IGNORE : BucketHelpers.GapPolicy.INSERT_ZEROS;
|
gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS;
|
||||||
metric = randomMetric("the_metric", VALUE_FIELD);
|
metric = randomMetric("the_metric", VALUE_FIELD);
|
||||||
mockHisto = ReducerHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble());
|
mockHisto = ReducerHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble());
|
||||||
|
|
||||||
|
@ -172,7 +172,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
||||||
// Gaps only apply to metric values, not doc _counts
|
// Gaps only apply to metric values, not doc _counts
|
||||||
if (mockBucket.count == 0 && target.equals(MetricTarget.VALUE)) {
|
if (mockBucket.count == 0 && target.equals(MetricTarget.VALUE)) {
|
||||||
// If there was a gap in doc counts and we are ignoring, just skip this bucket
|
// If there was a gap in doc counts and we are ignoring, just skip this bucket
|
||||||
if (gapPolicy.equals(BucketHelpers.GapPolicy.IGNORE)) {
|
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
|
||||||
values.add(null);
|
values.add(null);
|
||||||
continue;
|
continue;
|
||||||
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
||||||
|
@ -726,7 +726,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(current, notNullValue());
|
assertThat(current, notNullValue());
|
||||||
currentValue = current.value();
|
currentValue = current.value();
|
||||||
|
|
||||||
if (gapPolicy.equals(BucketHelpers.GapPolicy.IGNORE)) {
|
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
|
||||||
// if we are ignoring, movavg could go up (double_exp) or stay the same (simple, linear, single_exp)
|
// if we are ignoring, movavg could go up (double_exp) or stay the same (simple, linear, single_exp)
|
||||||
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
||||||
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
||||||
|
@ -785,7 +785,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(current, notNullValue());
|
assertThat(current, notNullValue());
|
||||||
currentValue = current.value();
|
currentValue = current.value();
|
||||||
|
|
||||||
if (gapPolicy.equals(BucketHelpers.GapPolicy.IGNORE)) {
|
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
|
||||||
// if we are ignoring, movavg could go up (double_exp) or stay the same (simple, linear, single_exp)
|
// if we are ignoring, movavg could go up (double_exp) or stay the same (simple, linear, single_exp)
|
||||||
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
||||||
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
||||||
|
|
Loading…
Reference in New Issue