Aggregations Refactor: Refactor Histogram and Date Histogram Aggregation
This commit is contained in:
parent
8c37c6f896
commit
97c2f7b037
|
@ -19,11 +19,13 @@
|
|||
package org.elasticsearch.common.rounding;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A strategy for rounding long values.
|
||||
|
@ -61,6 +63,12 @@ public abstract class Rounding implements Streamable {
|
|||
*/
|
||||
public abstract long nextRoundingValue(long value);
|
||||
|
||||
@Override
|
||||
public abstract boolean equals(Object obj);
|
||||
|
||||
@Override
|
||||
public abstract int hashCode();
|
||||
|
||||
/**
|
||||
* Rounding strategy which is based on an interval
|
||||
*
|
||||
|
@ -70,6 +78,8 @@ public abstract class Rounding implements Streamable {
|
|||
|
||||
final static byte ID = 0;
|
||||
|
||||
public static final ParseField INTERVAL_FIELD = new ParseField("interval");
|
||||
|
||||
private long interval;
|
||||
|
||||
public Interval() { // for serialization
|
||||
|
@ -126,12 +136,31 @@ public abstract class Rounding implements Streamable {
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Interval other = (Interval) obj;
|
||||
return Objects.equals(interval, other.interval);
|
||||
}
|
||||
}
|
||||
|
||||
public static class FactorRounding extends Rounding {
|
||||
|
||||
final static byte ID = 7;
|
||||
|
||||
public static final ParseField FACTOR_FIELD = new ParseField("factor");
|
||||
|
||||
private Rounding rounding;
|
||||
|
||||
private float factor;
|
||||
|
@ -166,7 +195,7 @@ public abstract class Rounding implements Streamable {
|
|||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
rounding = (TimeZoneRounding) Rounding.Streams.read(in);
|
||||
rounding = Rounding.Streams.read(in);
|
||||
factor = in.readFloat();
|
||||
}
|
||||
|
||||
|
@ -175,12 +204,32 @@ public abstract class Rounding implements Streamable {
|
|||
Rounding.Streams.write(rounding, out);
|
||||
out.writeFloat(factor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(rounding, factor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
FactorRounding other = (FactorRounding) obj;
|
||||
return Objects.equals(rounding, other.rounding)
|
||||
&& Objects.equals(factor, other.factor);
|
||||
}
|
||||
}
|
||||
|
||||
public static class OffsetRounding extends Rounding {
|
||||
|
||||
final static byte ID = 8;
|
||||
|
||||
public static final ParseField OFFSET_FIELD = new ParseField("offset");
|
||||
|
||||
private Rounding rounding;
|
||||
|
||||
private long offset;
|
||||
|
@ -224,6 +273,24 @@ public abstract class Rounding implements Streamable {
|
|||
Rounding.Streams.write(rounding, out);
|
||||
out.writeLong(offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(rounding, offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
OffsetRounding other = (OffsetRounding) obj;
|
||||
return Objects.equals(rounding, other.rounding)
|
||||
&& Objects.equals(offset, other.offset);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Streams {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.common.rounding;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -27,10 +28,13 @@ import org.joda.time.DateTimeZone;
|
|||
import org.joda.time.DurationField;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class TimeZoneRounding extends Rounding {
|
||||
public static final ParseField INTERVAL_FIELD = new ParseField("interval");
|
||||
public static final ParseField TIME_ZONE_FIELD = new ParseField("time_zone");
|
||||
|
||||
public static Builder builder(DateTimeUnit unit) {
|
||||
return new Builder(unit);
|
||||
|
@ -157,6 +161,24 @@ public abstract class TimeZoneRounding extends Rounding {
|
|||
out.writeByte(unit.id());
|
||||
out.writeString(timeZone.getID());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(unit, timeZone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TimeUnitRounding other = (TimeUnitRounding) obj;
|
||||
return Objects.equals(unit, other.unit)
|
||||
&& Objects.equals(timeZone, other.timeZone);
|
||||
}
|
||||
}
|
||||
|
||||
static class TimeIntervalRounding extends TimeZoneRounding {
|
||||
|
@ -214,5 +236,23 @@ public abstract class TimeZoneRounding extends Rounding {
|
|||
out.writeVLong(interval);
|
||||
out.writeString(timeZone.getID());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(interval, timeZone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TimeIntervalRounding other = (TimeIntervalRounding) obj;
|
||||
return Objects.equals(interval, other.interval)
|
||||
&& Objects.equals(timeZone, other.timeZone);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,7 +171,7 @@ public class DateHistogramBuilder extends ValuesSourceAggregationBuilder<DateHis
|
|||
}
|
||||
|
||||
if (extendedBoundsMin != null || extendedBoundsMax != null) {
|
||||
builder.startObject(DateHistogramParser.EXTENDED_BOUNDS.getPreferredName());
|
||||
builder.startObject(ExtendedBounds.EXTENDED_BOUNDS_FIELD.getPreferredName());
|
||||
if (extendedBoundsMin != null) {
|
||||
builder.field("min", extendedBoundsMin);
|
||||
}
|
||||
|
|
|
@ -19,10 +19,16 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The interval the date histogram is based on.
|
||||
*/
|
||||
public class DateHistogramInterval {
|
||||
public class DateHistogramInterval implements Writeable<DateHistogramInterval> {
|
||||
|
||||
public static final DateHistogramInterval SECOND = new DateHistogramInterval("1s");
|
||||
public static final DateHistogramInterval MINUTE = new DateHistogramInterval("1m");
|
||||
|
@ -33,6 +39,10 @@ public class DateHistogramInterval {
|
|||
public static final DateHistogramInterval QUARTER = new DateHistogramInterval("1q");
|
||||
public static final DateHistogramInterval YEAR = new DateHistogramInterval("1y");
|
||||
|
||||
public static final DateHistogramInterval readFromStream(StreamInput in) throws IOException {
|
||||
return SECOND.readFrom(in);
|
||||
}
|
||||
|
||||
public static DateHistogramInterval seconds(int sec) {
|
||||
return new DateHistogramInterval(sec + "s");
|
||||
}
|
||||
|
@ -63,4 +73,14 @@ public class DateHistogramInterval {
|
|||
public String toString() {
|
||||
return expression;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateHistogramInterval readFrom(StreamInput in) throws IOException {
|
||||
return new DateHistogramInterval(in.readString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(expression);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,54 +19,25 @@
|
|||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.rounding.DateTimeUnit;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.common.rounding.Rounding;
|
||||
import org.elasticsearch.common.rounding.TimeZoneRounding;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class DateHistogramParser implements Aggregator.Parser {
|
||||
public class DateHistogramParser extends HistogramParser {
|
||||
|
||||
static final ParseField EXTENDED_BOUNDS = new ParseField("extended_bounds");
|
||||
static final ParseField OFFSET = new ParseField("offset");
|
||||
static final ParseField INTERVAL = new ParseField("interval");
|
||||
|
||||
public static final Map<String, DateTimeUnit> DATE_FIELD_UNITS;
|
||||
|
||||
static {
|
||||
Map<String, DateTimeUnit> dateFieldUnits = new HashMap<>();
|
||||
dateFieldUnits.put("year", DateTimeUnit.YEAR_OF_CENTURY);
|
||||
dateFieldUnits.put("1y", DateTimeUnit.YEAR_OF_CENTURY);
|
||||
dateFieldUnits.put("quarter", DateTimeUnit.QUARTER);
|
||||
dateFieldUnits.put("1q", DateTimeUnit.QUARTER);
|
||||
dateFieldUnits.put("month", DateTimeUnit.MONTH_OF_YEAR);
|
||||
dateFieldUnits.put("1M", DateTimeUnit.MONTH_OF_YEAR);
|
||||
dateFieldUnits.put("week", DateTimeUnit.WEEK_OF_WEEKYEAR);
|
||||
dateFieldUnits.put("1w", DateTimeUnit.WEEK_OF_WEEKYEAR);
|
||||
dateFieldUnits.put("day", DateTimeUnit.DAY_OF_MONTH);
|
||||
dateFieldUnits.put("1d", DateTimeUnit.DAY_OF_MONTH);
|
||||
dateFieldUnits.put("hour", DateTimeUnit.HOUR_OF_DAY);
|
||||
dateFieldUnits.put("1h", DateTimeUnit.HOUR_OF_DAY);
|
||||
dateFieldUnits.put("minute", DateTimeUnit.MINUTES_OF_HOUR);
|
||||
dateFieldUnits.put("1m", DateTimeUnit.MINUTES_OF_HOUR);
|
||||
dateFieldUnits.put("second", DateTimeUnit.SECOND_OF_MINUTE);
|
||||
dateFieldUnits.put("1s", DateTimeUnit.SECOND_OF_MINUTE);
|
||||
DATE_FIELD_UNITS = unmodifiableMap(dateFieldUnits);
|
||||
public DateHistogramParser() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,127 +46,47 @@ public class DateHistogramParser implements Aggregator.Parser {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
|
||||
ValuesSourceParser vsParser = ValuesSourceParser.numeric(aggregationName, InternalDateHistogram.TYPE, context)
|
||||
.targetValueType(ValueType.DATE)
|
||||
.formattable(true)
|
||||
.timezoneAware(true)
|
||||
.build();
|
||||
|
||||
boolean keyed = false;
|
||||
long minDocCount = 0;
|
||||
ExtendedBounds extendedBounds = null;
|
||||
InternalOrder order = (InternalOrder) Histogram.Order.KEY_ASC;
|
||||
String interval = null;
|
||||
long offset = 0;
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (vsParser.token(currentFieldName, token, parser)) {
|
||||
continue;
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if (context.parseFieldMatcher().match(currentFieldName, OFFSET)) {
|
||||
offset = parseOffset(parser.text());
|
||||
} else if (context.parseFieldMatcher().match(currentFieldName, INTERVAL)) {
|
||||
interval = parser.text();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_BOOLEAN) {
|
||||
if ("keyed".equals(currentFieldName)) {
|
||||
keyed = parser.booleanValue();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if ("min_doc_count".equals(currentFieldName) || "minDocCount".equals(currentFieldName)) {
|
||||
minDocCount = parser.longValue();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("order".equals(currentFieldName)) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
String dir = parser.text();
|
||||
boolean asc = "asc".equals(dir);
|
||||
order = resolveOrder(currentFieldName, asc);
|
||||
//TODO should we throw an error if the value is not "asc" or "desc"???
|
||||
}
|
||||
}
|
||||
} else if (context.parseFieldMatcher().match(currentFieldName, EXTENDED_BOUNDS)) {
|
||||
extendedBounds = new ExtendedBounds();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if ("min".equals(currentFieldName)) {
|
||||
extendedBounds.minAsStr = parser.text();
|
||||
} else if ("max".equals(currentFieldName)) {
|
||||
extendedBounds.maxAsStr = parser.text();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown extended_bounds key for a " + token + " in aggregation ["
|
||||
+ aggregationName + "]: [" + currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if ("min".equals(currentFieldName)) {
|
||||
extendedBounds.min = parser.longValue();
|
||||
} else if ("max".equals(currentFieldName)) {
|
||||
extendedBounds.max = parser.longValue();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown extended_bounds key for a " + token + " in aggregation ["
|
||||
+ aggregationName + "]: [" + currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].",
|
||||
parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
|
||||
if (interval == null) {
|
||||
throw new SearchParseException(context,
|
||||
"Missing required field [interval] for histogram aggregation [" + aggregationName + "]", parser.getTokenLocation());
|
||||
}
|
||||
|
||||
TimeZoneRounding.Builder tzRoundingBuilder;
|
||||
DateTimeUnit dateTimeUnit = DATE_FIELD_UNITS.get(interval);
|
||||
if (dateTimeUnit != null) {
|
||||
tzRoundingBuilder = TimeZoneRounding.builder(dateTimeUnit);
|
||||
} else {
|
||||
// the interval is a time value?
|
||||
tzRoundingBuilder = TimeZoneRounding.builder(TimeValue.parseTimeValue(interval, null, getClass().getSimpleName() + ".interval"));
|
||||
}
|
||||
|
||||
Rounding rounding = tzRoundingBuilder
|
||||
.timeZone(vsParser.input().timezone())
|
||||
.offset(offset).build();
|
||||
|
||||
ValuesSourceParser.Input input = vsParser.input();
|
||||
return new HistogramAggregator.DateHistogramFactory(aggregationName, input, rounding, order, keyed, minDocCount, extendedBounds,
|
||||
new InternalDateHistogram.Factory());
|
||||
|
||||
protected Object parseStringInterval(String text) {
|
||||
return new DateHistogramInterval(text);
|
||||
}
|
||||
|
||||
private static InternalOrder resolveOrder(String key, boolean asc) {
|
||||
@Override
|
||||
protected ValuesSourceAggregatorFactory<Numeric> createFactory(String aggregationName, ValuesSourceType valuesSourceType,
|
||||
ValueType targetValueType, Map<ParseField, Object> otherOptions) {
|
||||
HistogramAggregator.DateHistogramFactory factory = new HistogramAggregator.DateHistogramFactory(aggregationName);
|
||||
Object interval = otherOptions.get(Rounding.Interval.INTERVAL_FIELD);
|
||||
if (interval == null) {
|
||||
throw new ParsingException(null, "Missing required field [interval] for histogram aggregation [" + aggregationName + "]");
|
||||
} else if (interval instanceof Long) {
|
||||
factory.interval((Long) interval);
|
||||
} else if (interval instanceof DateHistogramInterval) {
|
||||
factory.dateHistogramInterval((DateHistogramInterval) interval);
|
||||
}
|
||||
Long offset = (Long) otherOptions.get(Rounding.OffsetRounding.OFFSET_FIELD);
|
||||
if (offset != null) {
|
||||
factory.offset(offset);
|
||||
}
|
||||
|
||||
ExtendedBounds extendedBounds = (ExtendedBounds) otherOptions.get(ExtendedBounds.EXTENDED_BOUNDS_FIELD);
|
||||
if (extendedBounds != null) {
|
||||
factory.extendedBounds(extendedBounds);
|
||||
}
|
||||
Boolean keyed = (Boolean) otherOptions.get(HistogramAggregator.KEYED_FIELD);
|
||||
if (keyed != null) {
|
||||
factory.keyed(keyed);
|
||||
}
|
||||
Long minDocCount = (Long) otherOptions.get(HistogramAggregator.MIN_DOC_COUNT_FIELD);
|
||||
if (minDocCount != null) {
|
||||
factory.minDocCount(minDocCount);
|
||||
}
|
||||
InternalOrder order = (InternalOrder) otherOptions.get(HistogramAggregator.ORDER_FIELD);
|
||||
if (order != null) {
|
||||
factory.order(order);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
static InternalOrder resolveOrder(String key, boolean asc) {
|
||||
if ("_key".equals(key) || "_time".equals(key)) {
|
||||
return (InternalOrder) (asc ? InternalOrder.KEY_ASC : InternalOrder.KEY_DESC);
|
||||
}
|
||||
|
@ -205,7 +96,8 @@ public class DateHistogramParser implements Aggregator.Parser {
|
|||
return new InternalOrder.Aggregation(key, asc);
|
||||
}
|
||||
|
||||
private long parseOffset(String offset) throws IOException {
|
||||
@Override
|
||||
protected long parseStringOffset(String offset) throws IOException {
|
||||
if (offset.charAt(0) == '-') {
|
||||
return -TimeValue.parseTimeValue(offset.substring(1), null, getClass().getSimpleName() + ".parseOffset").millis();
|
||||
}
|
||||
|
@ -213,9 +105,8 @@ public class DateHistogramParser implements Aggregator.Parser {
|
|||
return TimeValue.parseTimeValue(offset.substring(beginIndex), null, getClass().getSimpleName() + ".parseOffset").millis();
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
return HistogramAggregator.DateHistogramFactory.PROTOTYPE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,19 +19,32 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.rounding.Rounding;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueParser;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ExtendedBounds {
|
||||
public class ExtendedBounds implements ToXContent {
|
||||
|
||||
static final ParseField EXTENDED_BOUNDS_FIELD = new ParseField("extended_bounds");
|
||||
static final ParseField MIN_FIELD = new ParseField("min");
|
||||
static final ParseField MAX_FIELD = new ParseField("max");
|
||||
|
||||
private static final ExtendedBounds PROTOTYPE = new ExtendedBounds();
|
||||
|
||||
Long min;
|
||||
Long max;
|
||||
|
@ -41,7 +54,7 @@ public class ExtendedBounds {
|
|||
|
||||
ExtendedBounds() {} //for serialization
|
||||
|
||||
ExtendedBounds(Long min, Long max) {
|
||||
public ExtendedBounds(Long min, Long max) {
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
}
|
||||
|
@ -89,4 +102,71 @@ public class ExtendedBounds {
|
|||
}
|
||||
return bounds;
|
||||
}
|
||||
|
||||
public ExtendedBounds fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher, String aggregationName)
|
||||
throws IOException {
|
||||
XContentParser.Token token = null;
|
||||
String currentFieldName = null;
|
||||
ExtendedBounds extendedBounds = new ExtendedBounds();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if ("min".equals(currentFieldName)) {
|
||||
extendedBounds.minAsStr = parser.text();
|
||||
} else if ("max".equals(currentFieldName)) {
|
||||
extendedBounds.maxAsStr = parser.text();
|
||||
} else {
|
||||
throw new ParsingException(parser.getTokenLocation(), "Unknown extended_bounds key for a " + token
|
||||
+ " in aggregation [" + aggregationName + "]: [" + currentFieldName + "].");
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if (parseFieldMatcher.match(currentFieldName, MIN_FIELD)) {
|
||||
extendedBounds.min = parser.longValue(true);
|
||||
} else if (parseFieldMatcher.match(currentFieldName, MAX_FIELD)) {
|
||||
extendedBounds.max = parser.longValue(true);
|
||||
} else {
|
||||
throw new ParsingException(parser.getTokenLocation(), "Unknown extended_bounds key for a " + token
|
||||
+ " in aggregation [" + aggregationName + "]: [" + currentFieldName + "].");
|
||||
}
|
||||
}
|
||||
}
|
||||
return extendedBounds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(EXTENDED_BOUNDS_FIELD.getPreferredName());
|
||||
if (min != null) {
|
||||
builder.field(MIN_FIELD.getPreferredName(), min);
|
||||
}
|
||||
if (max != null) {
|
||||
builder.field(MAX_FIELD.getPreferredName(), max);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(min, max);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
ExtendedBounds other = (ExtendedBounds) obj;
|
||||
return Objects.equals(min, other.min)
|
||||
&& Objects.equals(min, other.min);
|
||||
}
|
||||
|
||||
public static ExtendedBounds parse(XContentParser parser, ParseFieldMatcher parseFieldMatcher, String aggregationName)
|
||||
throws IOException {
|
||||
return PROTOTYPE.fromXContent(parser, parseFieldMatcher, aggregationName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,10 +21,18 @@ package org.elasticsearch.search.aggregations.bucket.histogram;
|
|||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.SortedNumericDocValues;
|
||||
import org.apache.lucene.util.CollectionUtil;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.common.inject.internal.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.rounding.DateTimeUnit;
|
||||
import org.elasticsearch.common.rounding.Rounding;
|
||||
import org.elasticsearch.common.rounding.TimeZoneRounding;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.LongHash;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation;
|
||||
|
@ -33,21 +41,29 @@ import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
|||
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
public class HistogramAggregator extends BucketsAggregator {
|
||||
|
||||
public static final ParseField ORDER_FIELD = new ParseField("order");
|
||||
public static final ParseField KEYED_FIELD = new ParseField("keyed");
|
||||
public static final ParseField MIN_DOC_COUNT_FIELD = new ParseField("min_doc_count");
|
||||
|
||||
private final ValuesSource.Numeric valuesSource;
|
||||
private final ValueFormatter formatter;
|
||||
private final Rounding rounding;
|
||||
|
@ -148,37 +164,97 @@ public class HistogramAggregator extends BucketsAggregator {
|
|||
|
||||
public static class Factory extends ValuesSourceAggregatorFactory<ValuesSource.Numeric> {
|
||||
|
||||
private final Rounding rounding;
|
||||
private final InternalOrder order;
|
||||
private final boolean keyed;
|
||||
private final long minDocCount;
|
||||
private final ExtendedBounds extendedBounds;
|
||||
public static final Factory PROTOTYPE = new Factory("");
|
||||
|
||||
private long interval;
|
||||
private long offset = 0;
|
||||
private InternalOrder order = (InternalOrder) Histogram.Order.KEY_ASC;
|
||||
private boolean keyed = false;
|
||||
private long minDocCount = 0;
|
||||
private ExtendedBounds extendedBounds;
|
||||
private final InternalHistogram.Factory<?> histogramFactory;
|
||||
|
||||
public Factory(String name, ValuesSourceParser.Input<ValuesSource.Numeric> input,
|
||||
Rounding rounding, InternalOrder order, boolean keyed, long minDocCount,
|
||||
ExtendedBounds extendedBounds, InternalHistogram.Factory<?> histogramFactory) {
|
||||
public Factory(String name) {
|
||||
this(name, InternalHistogram.HISTOGRAM_FACTORY);
|
||||
}
|
||||
|
||||
super(name, histogramFactory.type(), input);
|
||||
this.rounding = rounding;
|
||||
this.order = order;
|
||||
this.keyed = keyed;
|
||||
this.minDocCount = minDocCount;
|
||||
this.extendedBounds = extendedBounds;
|
||||
private Factory(String name, InternalHistogram.Factory<?> histogramFactory) {
|
||||
super(name, histogramFactory.type(), ValuesSourceType.NUMERIC, histogramFactory.valueType());
|
||||
this.histogramFactory = histogramFactory;
|
||||
}
|
||||
|
||||
public long interval() {
|
||||
return interval;
|
||||
}
|
||||
|
||||
public void interval(long interval) {
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
public long offset() {
|
||||
return offset;
|
||||
}
|
||||
|
||||
public void offset(long offset) {
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
public Histogram.Order order() {
|
||||
return order;
|
||||
}
|
||||
|
||||
public void order(Histogram.Order order) {
|
||||
this.order = (InternalOrder) order;
|
||||
}
|
||||
|
||||
public boolean keyed() {
|
||||
return keyed;
|
||||
}
|
||||
|
||||
public void keyed(boolean keyed) {
|
||||
this.keyed = keyed;
|
||||
}
|
||||
|
||||
public long minDocCount() {
|
||||
return minDocCount;
|
||||
}
|
||||
|
||||
public void minDocCount(long minDocCount) {
|
||||
this.minDocCount = minDocCount;
|
||||
}
|
||||
|
||||
public ExtendedBounds extendedBounds() {
|
||||
return extendedBounds;
|
||||
}
|
||||
|
||||
public void extendedBounds(ExtendedBounds extendedBounds) {
|
||||
this.extendedBounds = extendedBounds;
|
||||
}
|
||||
|
||||
public InternalHistogram.Factory<?> getHistogramFactory() {
|
||||
return histogramFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) throws IOException {
|
||||
Rounding rounding = createRounding();
|
||||
return new HistogramAggregator(name, factories, rounding, order, keyed, minDocCount, extendedBounds, null, config.formatter(),
|
||||
histogramFactory, aggregationContext, parent, pipelineAggregators, metaData);
|
||||
}
|
||||
|
||||
protected Rounding createRounding() {
|
||||
if (interval < 1) {
|
||||
throw new ParsingException(null, "[interval] must be 1 or greater for histogram aggregation [" + name() + "]: " + interval);
|
||||
}
|
||||
|
||||
Rounding rounding = new Rounding.Interval(interval);
|
||||
if (offset != 0) {
|
||||
rounding = new Rounding.OffsetRounding(rounding, offset);
|
||||
}
|
||||
return rounding;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, AggregationContext aggregationContext, Aggregator parent,
|
||||
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
|
||||
|
@ -186,8 +262,8 @@ public class HistogramAggregator extends BucketsAggregator {
|
|||
if (collectsFromSingleBucket == false) {
|
||||
return asMultiBucketAggregator(this, aggregationContext, parent);
|
||||
}
|
||||
// we need to round the bounds given by the user and we have to do
|
||||
// it for every aggregator we create
|
||||
Rounding rounding = createRounding();
|
||||
// we need to round the bounds given by the user and we have to do it for every aggregator we create
|
||||
// as the rounding is not necessarily an idempotent operation.
|
||||
// todo we need to think of a better structure to the factory/agtor
|
||||
// code so we won't need to do that
|
||||
|
@ -201,16 +277,155 @@ public class HistogramAggregator extends BucketsAggregator {
|
|||
config.formatter(), histogramFactory, aggregationContext, parent, pipelineAggregators, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
|
||||
builder.field(Rounding.Interval.INTERVAL_FIELD.getPreferredName(), interval);
|
||||
builder.field(Rounding.OffsetRounding.OFFSET_FIELD.getPreferredName(), offset);
|
||||
|
||||
if (order != null) {
|
||||
builder.field(ORDER_FIELD.getPreferredName());
|
||||
order.toXContent(builder, params);
|
||||
}
|
||||
|
||||
builder.field(KEYED_FIELD.getPreferredName(), keyed);
|
||||
|
||||
builder.field(MIN_DOC_COUNT_FIELD.getPreferredName(), minDocCount);
|
||||
|
||||
if (extendedBounds != null) {
|
||||
extendedBounds.toXContent(builder, params);
|
||||
}
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return InternalHistogram.TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Factory innerReadFrom(String name, ValuesSourceType valuesSourceType, ValueType targetValueType, StreamInput in)
|
||||
throws IOException {
|
||||
Factory factory = createFactoryFromStream(name, in);
|
||||
factory.interval = in.readVLong();
|
||||
factory.offset = in.readVLong();
|
||||
if (in.readBoolean()) {
|
||||
factory.order = InternalOrder.Streams.readOrder(in);
|
||||
}
|
||||
factory.keyed = in.readBoolean();
|
||||
factory.minDocCount = in.readVLong();
|
||||
if (in.readBoolean()) {
|
||||
factory.extendedBounds = ExtendedBounds.readFrom(in);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
protected Factory createFactoryFromStream(String name, StreamInput in)
|
||||
throws IOException {
|
||||
return new Factory(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void innerWriteTo(StreamOutput out) throws IOException {
|
||||
writeFactoryToStream(out);
|
||||
out.writeVLong(interval);
|
||||
out.writeVLong(offset);
|
||||
boolean hasOrder = order != null;
|
||||
out.writeBoolean(hasOrder);
|
||||
if (hasOrder) {
|
||||
InternalOrder.Streams.writeOrder(order, out);
|
||||
}
|
||||
out.writeBoolean(keyed);
|
||||
out.writeVLong(minDocCount);
|
||||
boolean hasExtendedBounds = extendedBounds != null;
|
||||
out.writeBoolean(hasExtendedBounds);
|
||||
if (hasExtendedBounds) {
|
||||
extendedBounds.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
protected void writeFactoryToStream(StreamOutput out) throws IOException {
|
||||
// Default impl does nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int innerHashCode() {
|
||||
return Objects.hash(histogramFactory, interval, offset, order, keyed, minDocCount, extendedBounds);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean innerEquals(Object obj) {
|
||||
Factory other = (Factory) obj;
|
||||
return Objects.equals(histogramFactory, other.histogramFactory)
|
||||
&& Objects.equals(interval, other.interval)
|
||||
&& Objects.equals(offset, other.offset)
|
||||
&& Objects.equals(order, other.order)
|
||||
&& Objects.equals(keyed, other.keyed)
|
||||
&& Objects.equals(minDocCount, other.minDocCount)
|
||||
&& Objects.equals(extendedBounds, other.extendedBounds);
|
||||
}
|
||||
}
|
||||
|
||||
public static class DateHistogramFactory extends Factory {
|
||||
|
||||
private DateTimeZone timeZone;
|
||||
public static final DateHistogramFactory PROTOTYPE = new DateHistogramFactory("");
|
||||
public static final Map<String, DateTimeUnit> DATE_FIELD_UNITS;
|
||||
|
||||
public DateHistogramFactory(String name, ValuesSourceParser.Input<Numeric> input, Rounding rounding, InternalOrder order,
|
||||
boolean keyed, long minDocCount, ExtendedBounds extendedBounds, InternalHistogram.Factory<?> histogramFactory) {
|
||||
super(name, input, rounding, order, keyed, minDocCount, extendedBounds, histogramFactory);
|
||||
this.timeZone = input.timezone();
|
||||
static {
|
||||
Map<String, DateTimeUnit> dateFieldUnits = new HashMap<>();
|
||||
dateFieldUnits.put("year", DateTimeUnit.YEAR_OF_CENTURY);
|
||||
dateFieldUnits.put("1y", DateTimeUnit.YEAR_OF_CENTURY);
|
||||
dateFieldUnits.put("quarter", DateTimeUnit.QUARTER);
|
||||
dateFieldUnits.put("1q", DateTimeUnit.QUARTER);
|
||||
dateFieldUnits.put("month", DateTimeUnit.MONTH_OF_YEAR);
|
||||
dateFieldUnits.put("1M", DateTimeUnit.MONTH_OF_YEAR);
|
||||
dateFieldUnits.put("week", DateTimeUnit.WEEK_OF_WEEKYEAR);
|
||||
dateFieldUnits.put("1w", DateTimeUnit.WEEK_OF_WEEKYEAR);
|
||||
dateFieldUnits.put("day", DateTimeUnit.DAY_OF_MONTH);
|
||||
dateFieldUnits.put("1d", DateTimeUnit.DAY_OF_MONTH);
|
||||
dateFieldUnits.put("hour", DateTimeUnit.HOUR_OF_DAY);
|
||||
dateFieldUnits.put("1h", DateTimeUnit.HOUR_OF_DAY);
|
||||
dateFieldUnits.put("minute", DateTimeUnit.MINUTES_OF_HOUR);
|
||||
dateFieldUnits.put("1m", DateTimeUnit.MINUTES_OF_HOUR);
|
||||
dateFieldUnits.put("second", DateTimeUnit.SECOND_OF_MINUTE);
|
||||
dateFieldUnits.put("1s", DateTimeUnit.SECOND_OF_MINUTE);
|
||||
DATE_FIELD_UNITS = unmodifiableMap(dateFieldUnits);
|
||||
}
|
||||
|
||||
private DateHistogramInterval dateHistogramInterval;
|
||||
|
||||
public DateHistogramFactory(String name) {
|
||||
super(name, InternalDateHistogram.HISTOGRAM_FACTORY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the interval.
|
||||
*/
|
||||
public void dateHistogramInterval(DateHistogramInterval dateHistogramInterval) {
|
||||
this.dateHistogramInterval = dateHistogramInterval;
|
||||
}
|
||||
|
||||
public DateHistogramInterval dateHistogramInterval() {
|
||||
return dateHistogramInterval;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Rounding createRounding() {
|
||||
TimeZoneRounding.Builder tzRoundingBuilder;
|
||||
DateTimeUnit dateTimeUnit = DATE_FIELD_UNITS.get(dateHistogramInterval.toString());
|
||||
if (dateTimeUnit != null) {
|
||||
tzRoundingBuilder = TimeZoneRounding.builder(dateTimeUnit);
|
||||
} else {
|
||||
// the interval is a time value?
|
||||
tzRoundingBuilder = TimeZoneRounding.builder(TimeValue.parseTimeValue(dateHistogramInterval.toString(), null, getClass()
|
||||
.getSimpleName() + ".interval"));
|
||||
}
|
||||
if (timeZone() != null) {
|
||||
tzRoundingBuilder.timeZone(timeZone());
|
||||
}
|
||||
Rounding rounding = tzRoundingBuilder.offset(offset()).build();
|
||||
return rounding;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -226,5 +441,41 @@ public class HistogramAggregator extends BucketsAggregator {
|
|||
return super
|
||||
.doCreateInternal(valuesSource, aggregationContext, parent, collectsFromSingleBucket, pipelineAggregators, metaData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return InternalDateHistogram.TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Factory createFactoryFromStream(String name, StreamInput in)
|
||||
throws IOException {
|
||||
DateHistogramFactory factory = new DateHistogramFactory(name);
|
||||
if (in.readBoolean()) {
|
||||
factory.dateHistogramInterval = DateHistogramInterval.readFromStream(in);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeFactoryToStream(StreamOutput out) throws IOException {
|
||||
boolean hasDateInterval = dateHistogramInterval != null;
|
||||
out.writeBoolean(hasDateInterval);
|
||||
if (hasDateInterval) {
|
||||
dateHistogramInterval.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int innerHashCode() {
|
||||
return Objects.hash(super.innerHashCode(), dateHistogramInterval);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean innerEquals(Object obj) {
|
||||
DateHistogramFactory other = (DateHistogramFactory) obj;
|
||||
return super.innerEquals(obj)
|
||||
&& Objects.equals(dateHistogramInterval, other.dateHistogramInterval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -119,7 +119,7 @@ public class HistogramBuilder extends ValuesSourceAggregationBuilder<HistogramBu
|
|||
}
|
||||
|
||||
if (extendedBoundsMin != null || extendedBoundsMax != null) {
|
||||
builder.startObject(HistogramParser.EXTENDED_BOUNDS.getPreferredName());
|
||||
builder.startObject(ExtendedBounds.EXTENDED_BOUNDS_FIELD.getPreferredName());
|
||||
if (extendedBoundsMin != null) {
|
||||
builder.field("min", extendedBoundsMin);
|
||||
}
|
||||
|
|
|
@ -19,24 +19,33 @@
|
|||
package org.elasticsearch.search.aggregations.bucket.histogram;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.common.rounding.Rounding;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.AbstractValuesSourceParser.NumericValuesSourceParser;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueParser;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Parses the histogram request
|
||||
*/
|
||||
public class HistogramParser implements Aggregator.Parser {
|
||||
public class HistogramParser extends NumericValuesSourceParser {
|
||||
|
||||
static final ParseField EXTENDED_BOUNDS = new ParseField("extended_bounds");
|
||||
public HistogramParser() {
|
||||
super(true, true, false);
|
||||
}
|
||||
|
||||
protected HistogramParser(boolean timezoneAware) {
|
||||
super(true, true, timezoneAware);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
|
@ -44,100 +53,105 @@ public class HistogramParser implements Aggregator.Parser {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
protected ValuesSourceAggregatorFactory<Numeric> createFactory(String aggregationName, ValuesSourceType valuesSourceType,
|
||||
ValueType targetValueType, Map<ParseField, Object> otherOptions) {
|
||||
HistogramAggregator.Factory factory = new HistogramAggregator.Factory(aggregationName);
|
||||
Long interval = (Long) otherOptions.get(Rounding.Interval.INTERVAL_FIELD);
|
||||
if (interval == null) {
|
||||
throw new ParsingException(null, "Missing required field [interval] for histogram aggregation [" + aggregationName + "]");
|
||||
} else {
|
||||
factory.interval(interval);
|
||||
}
|
||||
Long offset = (Long) otherOptions.get(Rounding.OffsetRounding.OFFSET_FIELD);
|
||||
if (offset != null) {
|
||||
factory.offset(offset);
|
||||
}
|
||||
|
||||
ValuesSourceParser vsParser = ValuesSourceParser.numeric(aggregationName, InternalHistogram.TYPE, context)
|
||||
.targetValueType(ValueType.NUMERIC)
|
||||
.formattable(true)
|
||||
.build();
|
||||
ExtendedBounds extendedBounds = (ExtendedBounds) otherOptions.get(ExtendedBounds.EXTENDED_BOUNDS_FIELD);
|
||||
if (extendedBounds != null) {
|
||||
factory.extendedBounds(extendedBounds);
|
||||
}
|
||||
Boolean keyed = (Boolean) otherOptions.get(HistogramAggregator.KEYED_FIELD);
|
||||
if (keyed != null) {
|
||||
factory.keyed(keyed);
|
||||
}
|
||||
Long minDocCount = (Long) otherOptions.get(HistogramAggregator.MIN_DOC_COUNT_FIELD);
|
||||
if (minDocCount != null) {
|
||||
factory.minDocCount(minDocCount);
|
||||
}
|
||||
InternalOrder order = (InternalOrder) otherOptions.get(HistogramAggregator.ORDER_FIELD);
|
||||
if (order != null) {
|
||||
factory.order(order);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
boolean keyed = false;
|
||||
long minDocCount = 0;
|
||||
InternalOrder order = (InternalOrder) InternalOrder.KEY_ASC;
|
||||
long interval = -1;
|
||||
ExtendedBounds extendedBounds = null;
|
||||
long offset = 0;
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (vsParser.token(currentFieldName, token, parser)) {
|
||||
continue;
|
||||
} else if (token.isValue()) {
|
||||
if ("interval".equals(currentFieldName)) {
|
||||
interval = parser.longValue();
|
||||
} else if ("min_doc_count".equals(currentFieldName) || "minDocCount".equals(currentFieldName)) {
|
||||
minDocCount = parser.longValue();
|
||||
} else if ("keyed".equals(currentFieldName)) {
|
||||
keyed = parser.booleanValue();
|
||||
} else if ("offset".equals(currentFieldName)) {
|
||||
offset = parser.longValue();
|
||||
@Override
|
||||
protected boolean token(String aggregationName, String currentFieldName, Token token, XContentParser parser, ParseFieldMatcher parseFieldMatcher, Map<ParseField, Object> otherOptions)
|
||||
throws IOException {
|
||||
if (token.isValue()) {
|
||||
if (parseFieldMatcher.match(currentFieldName, Rounding.Interval.INTERVAL_FIELD)) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
otherOptions.put(Rounding.Interval.INTERVAL_FIELD, parseStringInterval(parser.text()));
|
||||
return true;
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in aggregation [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
otherOptions.put(Rounding.Interval.INTERVAL_FIELD, parser.longValue());
|
||||
return true;
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("order".equals(currentFieldName)) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
String dir = parser.text();
|
||||
boolean asc = "asc".equals(dir);
|
||||
if (!asc && !"desc".equals(dir)) {
|
||||
throw new SearchParseException(context, "Unknown order direction [" + dir + "] in aggregation ["
|
||||
+ aggregationName + "]. Should be either [asc] or [desc]", parser.getTokenLocation());
|
||||
}
|
||||
order = resolveOrder(currentFieldName, asc);
|
||||
}
|
||||
}
|
||||
} else if (context.parseFieldMatcher().match(currentFieldName, EXTENDED_BOUNDS)) {
|
||||
extendedBounds = new ExtendedBounds();
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if ("min".equals(currentFieldName)) {
|
||||
extendedBounds.min = parser.longValue(true);
|
||||
} else if ("max".equals(currentFieldName)) {
|
||||
extendedBounds.max = parser.longValue(true);
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown extended_bounds key for a " + token + " in aggregation ["
|
||||
+ aggregationName + "]: [" + currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else if (parseFieldMatcher.match(currentFieldName, HistogramAggregator.MIN_DOC_COUNT_FIELD)) {
|
||||
otherOptions.put(HistogramAggregator.MIN_DOC_COUNT_FIELD, parser.longValue());
|
||||
return true;
|
||||
} else if (parseFieldMatcher.match(currentFieldName, HistogramAggregator.KEYED_FIELD)) {
|
||||
otherOptions.put(HistogramAggregator.KEYED_FIELD, parser.booleanValue());
|
||||
return true;
|
||||
} else if (parseFieldMatcher.match(currentFieldName, Rounding.OffsetRounding.OFFSET_FIELD)) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
otherOptions.put(Rounding.OffsetRounding.OFFSET_FIELD, parseStringOffset(parser.text()));
|
||||
return true;
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in aggregation [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
otherOptions.put(Rounding.OffsetRounding.OFFSET_FIELD, parser.longValue());
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in aggregation [" + aggregationName + "].",
|
||||
parser.getTokenLocation());
|
||||
return false;
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if (parseFieldMatcher.match(currentFieldName, HistogramAggregator.ORDER_FIELD)) {
|
||||
InternalOrder order = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
String dir = parser.text();
|
||||
boolean asc = "asc".equals(dir);
|
||||
if (!asc && !"desc".equals(dir)) {
|
||||
throw new ParsingException(parser.getTokenLocation(), "Unknown order direction in aggregation ["
|
||||
+ aggregationName + "]: [" + dir
|
||||
+ "]. Should be either [asc] or [desc]");
|
||||
}
|
||||
order = resolveOrder(currentFieldName, asc);
|
||||
}
|
||||
}
|
||||
otherOptions.put(HistogramAggregator.ORDER_FIELD, order);
|
||||
return true;
|
||||
} else if (parseFieldMatcher.match(currentFieldName, ExtendedBounds.EXTENDED_BOUNDS_FIELD)) {
|
||||
ExtendedBounds extendedBounds = ExtendedBounds.parse(parser, parseFieldMatcher, aggregationName);
|
||||
otherOptions.put(ExtendedBounds.EXTENDED_BOUNDS_FIELD, extendedBounds);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (interval < 1) {
|
||||
throw new SearchParseException(context,
|
||||
"Missing required field [interval] for histogram aggregation [" + aggregationName + "]", parser.getTokenLocation());
|
||||
}
|
||||
|
||||
Rounding rounding = new Rounding.Interval(interval);
|
||||
if (offset != 0) {
|
||||
rounding = new Rounding.OffsetRounding(rounding, offset);
|
||||
}
|
||||
|
||||
if (extendedBounds != null) {
|
||||
// with numeric histogram, we can process here and fail fast if the bounds are invalid
|
||||
extendedBounds.processAndValidate(aggregationName, context, ValueParser.RAW);
|
||||
}
|
||||
|
||||
return new HistogramAggregator.Factory(aggregationName, vsParser.input(), rounding, order, keyed, minDocCount, extendedBounds,
|
||||
new InternalHistogram.Factory());
|
||||
protected Object parseStringInterval(String interval) {
|
||||
return Long.valueOf(interval);
|
||||
}
|
||||
|
||||
protected long parseStringOffset(String offset) throws IOException {
|
||||
return Long.valueOf(offset);
|
||||
}
|
||||
|
||||
static InternalOrder resolveOrder(String key, boolean asc) {
|
||||
|
@ -150,9 +164,8 @@ public class HistogramParser implements Aggregator.Parser {
|
|||
return new InternalOrder.Aggregation(key, asc);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
return HistogramAggregator.Factory.PROTOTYPE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.histogram;
|
|||
import org.elasticsearch.search.aggregations.AggregationExecutionException;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
|
||||
import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
@ -30,6 +31,7 @@ import org.joda.time.DateTimeZone;
|
|||
*/
|
||||
public class InternalDateHistogram {
|
||||
|
||||
public static final Factory HISTOGRAM_FACTORY = new Factory();
|
||||
final static Type TYPE = new Type("date_histogram", "dhisto");
|
||||
|
||||
static class Bucket extends InternalHistogram.Bucket {
|
||||
|
@ -69,6 +71,11 @@ public class InternalDateHistogram {
|
|||
return TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueType valueType() {
|
||||
return ValueType.DATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalDateHistogram.Bucket createBucket(InternalAggregations aggregations, InternalDateHistogram.Bucket prototype) {
|
||||
return new Bucket(prototype.key, prototype.docCount, aggregations, prototype.getKeyed(), prototype.formatter, this);
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
|||
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
|
||||
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.support.ValueType;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
|
||||
|
||||
|
@ -51,6 +52,7 @@ import java.util.Map;
|
|||
public class InternalHistogram<B extends InternalHistogram.Bucket> extends InternalMultiBucketAggregation<InternalHistogram, B> implements
|
||||
Histogram {
|
||||
|
||||
public static final Factory<Bucket> HISTOGRAM_FACTORY = new Factory<Bucket>();
|
||||
final static Type TYPE = new Type("histogram", "histo");
|
||||
|
||||
private final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
|
@ -239,6 +241,10 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
return TYPE.name();
|
||||
}
|
||||
|
||||
public ValueType valueType() {
|
||||
return ValueType.NUMERIC;
|
||||
}
|
||||
|
||||
public InternalHistogram<B> create(String name, List<B> buckets, InternalOrder order, long minDocCount,
|
||||
EmptyBucketInfo emptyBucketInfo, ValueFormatter formatter, boolean keyed,
|
||||
List<PipelineAggregator> pipelineAggregators,
|
||||
|
@ -505,7 +511,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <B extends InternalHistogram.Bucket> Factory<B> resolveFactory(String factoryType) {
|
||||
protected static <B extends InternalHistogram.Bucket> Factory<B> resolveFactory(String factoryType) {
|
||||
if (factoryType.equals(InternalDateHistogram.TYPE.name())) {
|
||||
return (Factory<B>) new InternalDateHistogram.Factory();
|
||||
} else if (factoryType.equals(TYPE.name())) {
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Comparator;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* An internal {@link Histogram.Order} strategy which is identified by a unique id.
|
||||
|
@ -64,6 +65,25 @@ class InternalOrder extends Histogram.Order {
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
return builder.startObject().field(key, asc ? "asc" : "desc").endObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, key, asc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
InternalOrder other = (InternalOrder) obj;
|
||||
return Objects.equals(id, other.id)
|
||||
&& Objects.equals(key, other.key)
|
||||
&& Objects.equals(asc, other.asc);
|
||||
}
|
||||
|
||||
static class Aggregation extends InternalOrder {
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.elasticsearch.common.rounding.DateTimeUnit;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramParser;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
||||
|
@ -104,7 +104,7 @@ public class DerivativeParser implements PipelineAggregator.Parser {
|
|||
|
||||
Long xAxisUnits = null;
|
||||
if (units != null) {
|
||||
DateTimeUnit dateTimeUnit = DateHistogramParser.DATE_FIELD_UNITS.get(units);
|
||||
DateTimeUnit dateTimeUnit = HistogramAggregator.DateHistogramFactory.DATE_FIELD_UNITS.get(units);
|
||||
if (dateTimeUnit != null) {
|
||||
xAxisUnits = dateTimeUnit.field().getDurationField().getUnitMillis();
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.bucket;
|
||||
|
||||
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator.Factory;
|
||||
|
||||
public class HistogramTests extends BaseAggregationTestCase<HistogramAggregator.Factory> {
|
||||
|
||||
@Override
|
||||
protected Factory createTestAggregatorFactory() {
|
||||
Factory factory = new Factory("foo");
|
||||
factory.field(INT_FIELD_NAME);
|
||||
factory.interval(randomIntBetween(1, 100000));
|
||||
if (randomBoolean()) {
|
||||
long extendedBoundsMin = randomIntBetween(-100000, 100000);
|
||||
long extendedBoundsMax = randomIntBetween((int) extendedBoundsMin, 200000);
|
||||
factory.extendedBounds(new ExtendedBounds(extendedBoundsMin, extendedBoundsMax));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
factory.format("###.##");
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
factory.keyed(randomBoolean());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
factory.minDocCount(randomIntBetween(0, 100));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
factory.missing(randomIntBetween(0, 10));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
factory.offset(randomIntBetween(0, 100000));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int branch = randomInt(5);
|
||||
switch (branch) {
|
||||
case 0:
|
||||
factory.order(Order.COUNT_ASC);
|
||||
break;
|
||||
case 1:
|
||||
factory.order(Order.COUNT_DESC);
|
||||
break;
|
||||
case 2:
|
||||
factory.order(Order.KEY_ASC);
|
||||
break;
|
||||
case 3:
|
||||
factory.order(Order.KEY_DESC);
|
||||
break;
|
||||
case 4:
|
||||
factory.order(Order.aggregation("foo", true));
|
||||
break;
|
||||
case 5:
|
||||
factory.order(Order.aggregation("foo", false));
|
||||
break;
|
||||
}
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
}
|
|
@ -1029,7 +1029,7 @@ public class HistogramTests extends ESIntegTestCase {
|
|||
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(-1).minDocCount(0)).execute().actionGet();
|
||||
fail();
|
||||
} catch (SearchPhaseExecutionException e) {
|
||||
assertThat(e.toString(), containsString("Missing required field [interval]"));
|
||||
assertThat(e.toString(), containsString("[interval] must be 1 or greater for histogram aggregation [histo]"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue