Aggregations Refactor: Refactor Cardinality Aggregation
This commit is contained in:
parent
8499e27dc5
commit
39a951f1c0
|
@ -19,29 +19,61 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.metrics.cardinality;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
final class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource> {
|
||||
public final class CardinalityAggregatorFactory<VS extends ValuesSource> extends ValuesSourceAggregatorFactory.LeafOnly<VS> {
|
||||
|
||||
private final long precisionThreshold;
|
||||
public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold");
|
||||
|
||||
CardinalityAggregatorFactory(String name, ValuesSourceParser.Input<ValuesSource> input, long precisionThreshold) {
|
||||
super(name, InternalCardinality.TYPE, input);
|
||||
private Long precisionThreshold = null;
|
||||
|
||||
public CardinalityAggregatorFactory(String name, ValuesSourceType valuesSourceType, ValueType valueType) {
|
||||
super(name, InternalCardinality.TYPE, valuesSourceType, valueType);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a precision threshold. Higher values improve accuracy but also
|
||||
* increase memory usage.
|
||||
*/
|
||||
public void precisionThreshold(long precisionThreshold) {
|
||||
this.precisionThreshold = precisionThreshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the precision threshold. Higher values improve accuracy but also
|
||||
* increase memory usage. Will return <code>null</code> if the
|
||||
* precisionThreshold has not been set yet.
|
||||
*/
|
||||
public Long precisionThreshold() {
|
||||
return precisionThreshold;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated no replacement - values will always be rehashed
|
||||
*/
|
||||
@Deprecated
|
||||
public void rehash(boolean rehash) {
|
||||
// Deprecated all values are already rehashed so do nothing
|
||||
}
|
||||
|
||||
private int precision(Aggregator parent) {
|
||||
return precisionThreshold < 0 ? defaultPrecision(parent) : HyperLogLogPlusPlus.precisionFromThreshold(precisionThreshold);
|
||||
return precisionThreshold == null ? defaultPrecision(parent) : HyperLogLogPlusPlus.precisionFromThreshold(precisionThreshold);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,12 +83,50 @@ final class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory.L
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator doCreateInternal(ValuesSource valuesSource, AggregationContext context, Aggregator parent,
|
||||
protected Aggregator doCreateInternal(VS valuesSource, AggregationContext context, Aggregator parent,
|
||||
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
||||
return new CardinalityAggregator(name, valuesSource, precision(parent), config.formatter(), context, parent, pipelineAggregators,
|
||||
metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ValuesSourceAggregatorFactory<VS> innerReadFrom(String name, ValuesSourceType valuesSourceType,
|
||||
ValueType targetValueType, StreamInput in) throws IOException {
|
||||
CardinalityAggregatorFactory<VS> factory = new CardinalityAggregatorFactory<>(name, valuesSourceType, targetValueType);
|
||||
if (in.readBoolean()) {
|
||||
factory.precisionThreshold = in.readLong();
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerWriteTo(StreamOutput out) throws IOException {
|
||||
boolean hasPrecisionThreshold = precisionThreshold != null;
|
||||
out.writeBoolean(hasPrecisionThreshold);
|
||||
if (hasPrecisionThreshold) {
|
||||
out.writeLong(precisionThreshold);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
if (precisionThreshold != null) {
|
||||
builder.field(PRECISION_THRESHOLD_FIELD.getPreferredName(), precisionThreshold);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int innerHashCode() {
|
||||
return Objects.hash(precisionThreshold);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean innerEquals(Object obj) {
|
||||
CardinalityAggregatorFactory<ValuesSource> other = (CardinalityAggregatorFactory<ValuesSource>) obj;
|
||||
return Objects.equals(precisionThreshold, other.precisionThreshold);
|
||||
}
|
||||
|
||||
/*
|
||||
* If one of the parent aggregators is a MULTI_BUCKET one, we might want to lower the precision
|
||||
* because otherwise it might be memory-intensive. On the other hand, for top-level aggregators
|
||||
|
|
|
@ -20,66 +20,62 @@
|
|||
package org.elasticsearch.search.aggregations.metrics.cardinality;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.AbstractValuesSourceParser.AnyValuesSourceParser;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class CardinalityParser implements Aggregator.Parser {
|
||||
public class CardinalityParser extends AnyValuesSourceParser {
|
||||
|
||||
private static final ParseField PRECISION_THRESHOLD = new ParseField("precision_threshold");
|
||||
private static final ParseField REHASH = new ParseField("rehash").withAllDeprecated("no replacement - values will always be rehashed");
|
||||
|
||||
public CardinalityParser() {
|
||||
super(true, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return InternalCardinality.TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory parse(String name, XContentParser parser, SearchContext context) throws IOException {
|
||||
protected ValuesSourceAggregatorFactory<ValuesSource> createFactory(String aggregationName, ValuesSourceType valuesSourceType,
|
||||
ValueType targetValueType, Map<ParseField, Object> otherOptions) {
|
||||
CardinalityAggregatorFactory<ValuesSource> factory = new CardinalityAggregatorFactory<>(aggregationName, valuesSourceType,
|
||||
targetValueType);
|
||||
Long precisionThreshold = (Long) otherOptions.get(CardinalityAggregatorFactory.PRECISION_THRESHOLD_FIELD);
|
||||
if (precisionThreshold != null) {
|
||||
factory.precisionThreshold(precisionThreshold);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
ValuesSourceParser<ValuesSource> vsParser = ValuesSourceParser.any(name, InternalCardinality.TYPE, context).formattable(false)
|
||||
.build();
|
||||
|
||||
long precisionThreshold = -1;
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (vsParser.token(currentFieldName, token, parser)) {
|
||||
continue;
|
||||
} else if (token.isValue()) {
|
||||
if (context.parseFieldMatcher().match(currentFieldName, REHASH)) {
|
||||
// ignore
|
||||
} else if (context.parseFieldMatcher().match(currentFieldName, PRECISION_THRESHOLD)) {
|
||||
precisionThreshold = parser.longValue();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + name + "]: [" + currentFieldName
|
||||
+ "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + name + "].", parser.getTokenLocation());
|
||||
@Override
|
||||
protected boolean token(String aggregationName, String currentFieldName, Token token, XContentParser parser,
|
||||
ParseFieldMatcher parseFieldMatcher, Map<ParseField, Object> otherOptions) throws IOException {
|
||||
if (token.isValue()) {
|
||||
if (parseFieldMatcher.match(currentFieldName, CardinalityAggregatorFactory.PRECISION_THRESHOLD_FIELD)) {
|
||||
otherOptions.put(CardinalityAggregatorFactory.PRECISION_THRESHOLD_FIELD, parser.longValue());
|
||||
return true;
|
||||
} else if (parseFieldMatcher.match(currentFieldName, REHASH)) {
|
||||
// ignore
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
ValuesSourceParser.Input<ValuesSource> input = vsParser.input();
|
||||
|
||||
return new CardinalityAggregatorFactory(name, input, precisionThreshold);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory[] getFactoryPrototypes() {
|
||||
return null;
|
||||
return new AggregatorFactory[] { new CardinalityAggregatorFactory<ValuesSource>(null, null, null) };
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.metrics.cardinality;
|
||||
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
|
||||
public class CardinalityTests extends BaseAggregationTestCase<CardinalityAggregatorFactory<? extends ValuesSource>> {
|
||||
|
||||
@Override
|
||||
protected final CardinalityAggregatorFactory<? extends ValuesSource> createTestAggregatorFactory() {
|
||||
CardinalityAggregatorFactory<ValuesSource> factory = new CardinalityAggregatorFactory<ValuesSource>("foo", ValuesSourceType.ANY,
|
||||
null);
|
||||
String field = randomNumericField();
|
||||
int randomFieldBranch = randomInt(3);
|
||||
switch (randomFieldBranch) {
|
||||
case 0:
|
||||
factory.field(field);
|
||||
break;
|
||||
case 1:
|
||||
factory.field(field);
|
||||
factory.script(new Script("_value + 1"));
|
||||
break;
|
||||
case 2:
|
||||
factory.script(new Script("doc[" + field + "] + 1"));
|
||||
break;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
factory.missing("MISSING");
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue