Adds the ability to specify a format on composite date_histogram source (#28310)

This commit adds the ability to specify a date format on the `date_histogram` composite source.
If the format is defined, the key for the source is returned as a formatted date.

Closes #27923
This commit is contained in:
Jim Ferenczi 2018-01-23 15:14:49 +01:00 committed by GitHub
parent d31e964a86
commit 19cfc25873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 401 additions and 76 deletions

View File

@ -225,7 +225,41 @@ Note that fractional time values are not supported, but you can address this by
time unit (e.g., `1.5h` could instead be specified as `90m`).
[float]
===== Time Zone
====== Format
Internally, a date is represented as a 64 bit number representing a timestamp in milliseconds-since-the-epoch.
These timestamps are returned as the bucket keys. It is possible to return a formatted date string instead using
the format specified with the format parameter:
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{
"date": {
"date_histogram" : {
"field": "timestamp",
"interval": "1d",
"format": "yyyy-MM-dd" <1>
}
}
}
]
}
}
}
}
--------------------------------------------------
// CONSOLE
<1> Supports expressive date <<date-format-pattern,format pattern>>
[float]
====== Time Zone
Date-times are stored in Elasticsearch in UTC. By default, all bucketing and
rounding is also done in UTC. The `time_zone` parameter can be used to indicate

View File

@ -7,6 +7,8 @@ setup:
mappings:
doc:
properties:
date:
type: date
keyword:
type: keyword
long:
@ -40,6 +42,20 @@ setup:
id: 4
body: { "keyword": "bar", "long": [1000, 0] }
- do:
index:
index: test
type: doc
id: 5
body: { "date": "2017-10-20T03:08:45" }
- do:
index:
index: test
type: doc
id: 6
body: { "date": "2017-10-21T07:00:00" }
- do:
indices.refresh:
index: [test]
@ -66,7 +82,7 @@ setup:
}
]
- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.kw: "bar" }
- match: { aggregations.test.buckets.0.doc_count: 3 }
@ -104,7 +120,7 @@ setup:
}
]
- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 5 }
- match: { aggregations.test.buckets.0.key.long: 0}
- match: { aggregations.test.buckets.0.key.kw: "bar" }
@ -154,7 +170,7 @@ setup:
]
after: { "long": 20, "kw": "foo" }
- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.long: 100 }
- match: { aggregations.test.buckets.0.key.kw: "bar" }
@ -188,7 +204,7 @@ setup:
]
after: { "kw": "delta" }
- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 1 }
- match: { aggregations.test.buckets.0.key.kw: "foo" }
- match: { aggregations.test.buckets.0.doc_count: 2 }
@ -220,3 +236,62 @@ setup:
}
}
]
---
"Composite aggregation with format":
- skip:
version: " - 6.99.99"
reason: this uses a new option (format) added in 7.0.0
- do:
search:
index: test
body:
aggregations:
test:
composite:
sources: [
{
"date": {
"date_histogram": {
"field": "date",
"interval": "1d",
"format": "yyyy-MM-dd"
}
}
}
]
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-20" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
- match: { aggregations.test.buckets.1.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.1.doc_count: 1 }
- do:
search:
index: test
body:
aggregations:
test:
composite:
after: {
date: "2017-10-20"
}
sources: [
{
"date": {
"date_histogram": {
"field": "date",
"interval": "1d",
"format": "yyyy-MM-dd"
}
}
}
]
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 1 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.0.doc_count: 1 }

View File

@ -147,17 +147,15 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
Sort sort = indexSortConfig.buildIndexSort(shardContext::fieldMapper, shardContext::getForField);
System.arraycopy(sort.getSort(), 0, sortFields, 0, sortFields.length);
}
List<String> sourceNames = new ArrayList<>();
for (int i = 0; i < configs.length; i++) {
configs[i] = sources.get(i).build(context, i, configs.length, sortFields[i]);
sourceNames.add(sources.get(i).name());
if (configs[i].valuesSource().needsScores()) {
throw new IllegalArgumentException("[sources] cannot access _score");
}
}
final CompositeKey afterKey;
if (after != null) {
if (after.size() != sources.size()) {
if (after.size() != configs.length) {
throw new IllegalArgumentException("[after] has " + after.size() +
" value(s) but [sources] has " + sources.size());
}
@ -179,7 +177,7 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
} else {
afterKey = null;
}
return new CompositeAggregationFactory(name, context, parent, subfactoriesBuilder, metaData, size, configs, sourceNames, afterKey);
return new CompositeAggregationFactory(name, context, parent, subfactoriesBuilder, metaData, size, configs, afterKey);
}

View File

@ -32,17 +32,14 @@ import java.util.Map;
class CompositeAggregationFactory extends AggregatorFactory<CompositeAggregationFactory> {
private final int size;
private final CompositeValuesSourceConfig[] sources;
private final List<String> sourceNames;
private final CompositeKey afterKey;
CompositeAggregationFactory(String name, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sources,
List<String> sourceNames, CompositeKey afterKey) throws IOException {
int size, CompositeValuesSourceConfig[] sources, CompositeKey afterKey) throws IOException {
super(name, context, parent, subFactoriesBuilder, metaData);
this.size = size;
this.sources = sources;
this.sourceNames = sourceNames;
this.afterKey = afterKey;
}
@ -50,6 +47,6 @@ class CompositeAggregationFactory extends AggregatorFactory<CompositeAggregation
protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new CompositeAggregator(name, factories, context, parent, pipelineAggregators, metaData,
size, sources, sourceNames, afterKey);
size, sources, afterKey);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -43,11 +44,13 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
final class CompositeAggregator extends BucketsAggregator {
private final int size;
private final CompositeValuesSourceConfig[] sources;
private final List<String> sourceNames;
private final List<DocValueFormat> formats;
private final boolean canEarlyTerminate;
private final TreeMap<Integer, Integer> keys;
@ -59,12 +62,12 @@ final class CompositeAggregator extends BucketsAggregator {
CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sources, List<String> sourceNames,
CompositeKey rawAfterKey) throws IOException {
int size, CompositeValuesSourceConfig[] sources, CompositeKey rawAfterKey) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
this.size = size;
this.sources = sources;
this.sourceNames = sourceNames;
this.sourceNames = Arrays.stream(sources).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.formats = Arrays.stream(sources).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
// we use slot 0 to fill the current document (size+1).
this.array = new CompositeValuesComparator(context.searcher().getIndexReader(), sources, size+1);
if (rawAfterKey != null) {
@ -131,15 +134,17 @@ final class CompositeAggregator extends BucketsAggregator {
CompositeKey key = array.toCompositeKey(slot);
InternalAggregations aggs = bucketAggregations(slot);
int docCount = bucketDocCount(slot);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, key, reverseMuls, docCount, aggs);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}
return new InternalComposite(name, size, sourceNames, Arrays.asList(buckets), reverseMuls, pipelineAggregators(), metaData());
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), reverseMuls,
pipelineAggregators(), metaData());
}
@Override
public InternalAggregation buildEmptyAggregation() {
final int[] reverseMuls = getReverseMuls();
return new InternalComposite(name, size, sourceNames, Collections.emptyList(), reverseMuls, pipelineAggregators(), metaData());
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), reverseMuls,
pipelineAggregators(), metaData());
}
@Override

View File

@ -56,7 +56,7 @@ final class CompositeValuesComparator {
if (vs.isFloatingPoint()) {
arrays[i] = CompositeValuesSource.wrapDouble(vs, size, reverseMul);
} else {
arrays[i] = CompositeValuesSource.wrapLong(vs, size, reverseMul);
arrays[i] = CompositeValuesSource.wrapLong(vs, sources[i].format(), size, reverseMul);
}
}
}

View File

@ -23,8 +23,10 @@ import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;
@ -96,8 +98,9 @@ abstract class CompositeValuesSource<VS extends ValuesSource, T extends Comparab
/**
* Creates a {@link CompositeValuesSource} that generates long values.
*/
static CompositeValuesSource<ValuesSource.Numeric, Long> wrapLong(ValuesSource.Numeric vs, int size, int reverseMul) {
return new LongValuesSource(vs, size, reverseMul);
static CompositeValuesSource<ValuesSource.Numeric, Long> wrapLong(ValuesSource.Numeric vs, DocValueFormat format,
int size, int reverseMul) {
return new LongValuesSource(vs, format, size, reverseMul);
}
/**
@ -273,9 +276,12 @@ abstract class CompositeValuesSource<VS extends ValuesSource, T extends Comparab
*/
private static class LongValuesSource extends CompositeValuesSource<ValuesSource.Numeric, Long> {
private final long[] values;
// handles "format" for date histogram source
private final DocValueFormat format;
LongValuesSource(ValuesSource.Numeric vs, int size, int reverseMul) {
LongValuesSource(ValuesSource.Numeric vs, DocValueFormat format, int size, int reverseMul) {
super(vs, size, reverseMul);
this.format = format;
this.values = new long[size];
}
@ -304,7 +310,11 @@ abstract class CompositeValuesSource<VS extends ValuesSource, T extends Comparab
if (value instanceof Number) {
topValue = ((Number) value).longValue();
} else {
topValue = Long.parseLong(value.toString());
// for date histogram source with "format", the after value is formatted
// as a string so we need to retrieve the original value in milliseconds.
topValue = format.parseLong(value.toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.SortField;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -51,6 +52,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
private ValueType valueType = null;
private Object missing = null;
private SortOrder order = SortOrder.ASC;
private String format = null;
CompositeValuesSourceBuilder(String name) {
this(name, null);
@ -72,6 +74,11 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
}
this.missing = in.readGenericValue();
this.order = SortOrder.readFromStream(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
this.format = in.readOptionalString();
} else {
this.format = null;
}
}
@Override
@ -90,6 +97,9 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
}
out.writeGenericValue(missing);
order.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalString(format);
}
innerWriteTo(out);
}
@ -112,6 +122,9 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
if (valueType != null) {
builder.field("value_type", valueType.getPreferredName());
}
if (format != null) {
builder.field("format", format);
}
builder.field("order", order);
doXContentBody(builder, params);
builder.endObject();
@ -120,7 +133,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
@Override
public final int hashCode() {
return Objects.hash(field, missing, script, valueType, order, innerHashCode());
return Objects.hash(field, missing, script, valueType, order, format, innerHashCode());
}
protected abstract int innerHashCode();
@ -137,6 +150,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
Objects.equals(valueType, that.valueType()) &&
Objects.equals(missing, that.missing()) &&
Objects.equals(order, that.order()) &&
Objects.equals(format, that.format()) &&
innerEquals(that);
}
@ -254,6 +268,24 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
return order;
}
/**
* Sets the format to use for the output of the aggregation.
*/
public AB format(String format) {
if (format == null) {
throw new IllegalArgumentException("[format] must not be null: [" + name + "]");
}
this.format = format;
return (AB) this;
}
/**
* Gets the format to use for the output of the aggregation.
*/
public String format() {
return format;
}
/**
* Creates a {@link CompositeValuesSourceConfig} for this source.
*
@ -271,7 +303,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
public final CompositeValuesSourceConfig build(SearchContext context, int pos, int numPos, SortField sortField) throws IOException {
ValuesSourceConfig<?> config = ValuesSourceConfig.resolve(context.getQueryShardContext(),
valueType, field, script, missing, null, null);
valueType, field, script, missing, null, format);
return innerBuild(context, config, pos, numPos, sortField);
}

View File

@ -19,30 +19,47 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;
class CompositeValuesSourceConfig {
private final String name;
private final ValuesSource vs;
private final DocValueFormat format;
private final int reverseMul;
private final boolean canEarlyTerminate;
CompositeValuesSourceConfig(String name, ValuesSource vs, SortOrder order, boolean canEarlyTerminate) {
CompositeValuesSourceConfig(String name, ValuesSource vs, DocValueFormat format, SortOrder order, boolean canEarlyTerminate) {
this.name = name;
this.vs = vs;
this.format = format;
this.canEarlyTerminate = canEarlyTerminate;
this.reverseMul = order == SortOrder.ASC ? 1 : -1;
}
/**
* Returns the name associated with this configuration.
*/
String name() {
return name;
}
/**
* Returns the {@link ValuesSource} for this configuration.
*/
ValuesSource valuesSource() {
return vs;
}
/**
* The {@link DocValueFormat} to use for formatting the keys.
* {@link DocValueFormat#RAW} means no formatting.
*/
DocValueFormat format() {
return format;
}
/**
* The sort order for the values source (e.g. -1 for descending and 1 for ascending).
*/
@ -51,6 +68,9 @@ class CompositeValuesSourceConfig {
return reverseMul;
}
/**
* Returns whether this {@link ValuesSource} is used to sort the index.
*/
boolean canEarlyTerminate() {
return canEarlyTerminate;
}

View File

@ -30,6 +30,8 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.FieldContext;
@ -46,8 +48,8 @@ import java.util.Objects;
import static org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder.DATE_FIELD_UNITS;
/**
* A {@link CompositeValuesSourceBuilder} that that builds a {@link RoundingValuesSource} from a {@link Script} or
* a field name.
* A {@link CompositeValuesSourceBuilder} that builds a {@link RoundingValuesSource} from a {@link Script} or
* a field name using the provided interval.
*/
public class DateHistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<DateHistogramValuesSourceBuilder> {
static final String TYPE = "date_histogram";
@ -55,6 +57,7 @@ public class DateHistogramValuesSourceBuilder extends CompositeValuesSourceBuild
private static final ObjectParser<DateHistogramValuesSourceBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(DateHistogramValuesSourceBuilder.TYPE);
PARSER.declareString(DateHistogramValuesSourceBuilder::format, new ParseField("format"));
PARSER.declareField((histogram, interval) -> {
if (interval instanceof Long) {
histogram.interval((long) interval);
@ -235,7 +238,11 @@ public class DateHistogramValuesSourceBuilder extends CompositeValuesSourceBuild
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, order(), canEarlyTerminate);
// dates are returned as timestamp in milliseconds-since-the-epoch unless a specific date format
// is specified in the builder.
final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format();
return new CompositeValuesSourceConfig(name, vs, docValueFormat,
order(), canEarlyTerminate);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}

View File

@ -37,7 +37,7 @@ import java.io.IOException;
import java.util.Objects;
/**
* A {@link CompositeValuesSourceBuilder} that that builds a {@link HistogramValuesSource} from another numeric values source
* A {@link CompositeValuesSourceBuilder} that builds a {@link HistogramValuesSource} from another numeric values source
* using the provided interval.
*/
public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<HistogramValuesSourceBuilder> {
@ -128,7 +128,7 @@ public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<H
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, order(), canEarlyTerminate);
return new CompositeValuesSourceConfig(name, vs, config.format(), order(), canEarlyTerminate);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}

View File

@ -20,9 +20,11 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -35,6 +37,7 @@ import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -49,11 +52,14 @@ public class InternalComposite
private final List<InternalBucket> buckets;
private final int[] reverseMuls;
private final List<String> sourceNames;
private final List<DocValueFormat> formats;
InternalComposite(String name, int size, List<String> sourceNames, List<InternalBucket> buckets, int[] reverseMuls,
InternalComposite(String name, int size, List<String> sourceNames, List<DocValueFormat> formats,
List<InternalBucket> buckets, int[] reverseMuls,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.sourceNames = sourceNames;
this.formats = formats;
this.buckets = buckets;
this.size = size;
this.reverseMuls = reverseMuls;
@ -63,14 +69,27 @@ public class InternalComposite
super(in);
this.size = in.readVInt();
this.sourceNames = in.readList(StreamInput::readString);
this.formats = new ArrayList<>(sourceNames.size());
for (int i = 0; i < sourceNames.size(); i++) {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
formats.add(in.readNamedWriteable(DocValueFormat.class));
} else {
formats.add(DocValueFormat.RAW);
}
}
this.reverseMuls = in.readIntArray();
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, reverseMuls));
this.buckets = in.readList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls));
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(size);
out.writeStringList(sourceNames);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
for (DocValueFormat format : formats) {
out.writeNamedWriteable(format);
}
}
out.writeIntArray(reverseMuls);
out.writeList(buckets);
}
@ -87,12 +106,13 @@ public class InternalComposite
@Override
public InternalComposite create(List<InternalBucket> buckets) {
return new InternalComposite(name, size, sourceNames, buckets, reverseMuls, pipelineAggregators(), getMetaData());
return new InternalComposite(name, size, sourceNames, formats, buckets, reverseMuls, pipelineAggregators(), getMetaData());
}
@Override
public InternalBucket createBucket(InternalAggregations aggregations, InternalBucket prototype) {
return new InternalBucket(prototype.sourceNames, prototype.key, prototype.reverseMuls, prototype.docCount, aggregations);
return new InternalBucket(prototype.sourceNames, prototype.formats, prototype.key, prototype.reverseMuls,
prototype.docCount, aggregations);
}
public int getSize() {
@ -149,7 +169,7 @@ public class InternalComposite
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}
return new InternalComposite(name, size, sourceNames, result, reverseMuls, pipelineAggregators(), metaData);
return new InternalComposite(name, size, sourceNames, formats, result, reverseMuls, pipelineAggregators(), metaData);
}
@Override
@ -191,18 +211,21 @@ public class InternalComposite
private final InternalAggregations aggregations;
private final transient int[] reverseMuls;
private final transient List<String> sourceNames;
private final transient List<DocValueFormat> formats;
InternalBucket(List<String> sourceNames, CompositeKey key, int[] reverseMuls, long docCount, InternalAggregations aggregations) {
InternalBucket(List<String> sourceNames, List<DocValueFormat> formats, CompositeKey key, int[] reverseMuls, long docCount,
InternalAggregations aggregations) {
this.key = key;
this.docCount = docCount;
this.aggregations = aggregations;
this.reverseMuls = reverseMuls;
this.sourceNames = sourceNames;
this.formats = formats;
}
@SuppressWarnings("unchecked")
InternalBucket(StreamInput in, List<String> sourceNames, int[] reverseMuls) throws IOException {
InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats, int[] reverseMuls) throws IOException {
final Comparable<?>[] values = new Comparable<?>[in.readVInt()];
for (int i = 0; i < values.length; i++) {
values[i] = (Comparable<?>) in.readGenericValue();
@ -212,6 +235,7 @@ public class InternalComposite
this.aggregations = InternalAggregations.readAggregations(in);
this.reverseMuls = reverseMuls;
this.sourceNames = sourceNames;
this.formats = formats;
}
@Override
@ -242,9 +266,11 @@ public class InternalComposite
@Override
public Map<String, Object> getKey() {
return new ArrayMap(sourceNames, key.values());
// returns the formatted key in a map
return new ArrayMap(sourceNames, formats, key.values());
}
// get the raw key (without formatting to preserve the natural order).
// visible for testing
CompositeKey getRawKey() {
return key;
@ -260,7 +286,7 @@ public class InternalComposite
}
builder.append(sourceNames.get(i));
builder.append('=');
builder.append(formatObject(key.get(i)));
builder.append(formatObject(key.get(i), formats.get(i)));
}
builder.append('}');
return builder.toString();
@ -284,7 +310,7 @@ public class InternalComposite
aggregations.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, reduceContext);
return new InternalBucket(sourceNames, key, reverseMuls, docCount, aggs);
return new InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}
@Override
@ -303,26 +329,52 @@ public class InternalComposite
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
/**
* See {@link CompositeAggregation#bucketToXContentFragment}
* See {@link CompositeAggregation#bucketToXContent}
*/
throw new UnsupportedOperationException("not implemented");
}
}
static Object formatObject(Object obj) {
if (obj instanceof BytesRef) {
return ((BytesRef) obj).utf8ToString();
/**
* Format <code>obj</code> using the provided {@link DocValueFormat}.
* If the format is equals to {@link DocValueFormat#RAW}, the object is returned as is
* for numbers and a string for {@link BytesRef}s.
*/
static Object formatObject(Object obj, DocValueFormat format) {
if (obj.getClass() == BytesRef.class) {
BytesRef value = (BytesRef) obj;
if (format == DocValueFormat.RAW) {
return value.utf8ToString();
} else {
return format.format((BytesRef) obj);
}
} else if (obj.getClass() == Long.class) {
Long value = (Long) obj;
if (format == DocValueFormat.RAW) {
return value;
} else {
return format.format(value);
}
} else if (obj.getClass() == Double.class) {
Double value = (Double) obj;
if (format == DocValueFormat.RAW) {
return value;
} else {
return format.format((Double) obj);
}
}
return obj;
}
private static class ArrayMap extends AbstractMap<String, Object> {
final List<String> keys;
final List<DocValueFormat> formats;
final Object[] values;
ArrayMap(List<String> keys, Object[] values) {
assert keys.size() == values.length;
ArrayMap(List<String> keys, List<DocValueFormat> formats, Object[] values) {
assert keys.size() == values.length && keys.size() == formats.size();
this.keys = keys;
this.formats = formats;
this.values = values;
}
@ -335,7 +387,7 @@ public class InternalComposite
public Object get(Object key) {
for (int i = 0; i < keys.size(); i++) {
if (key.equals(keys.get(i))) {
return formatObject(values[i]);
return formatObject(values[i], formats.get(i));
}
}
return null;
@ -356,7 +408,7 @@ public class InternalComposite
@Override
public Entry<String, Object> next() {
SimpleEntry<String, Object> entry =
new SimpleEntry<>(keys.get(pos), formatObject(values[pos]));
new SimpleEntry<>(keys.get(pos), formatObject(values[pos], formats.get(pos)));
++ pos;
return entry;
}

View File

@ -95,6 +95,6 @@ public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<Terms
canEarlyTerminate = checkCanEarlyTerminate(context.searcher().getIndexReader(),
fieldContext.field(), order() == SortOrder.ASC ? false : true, sortField);
}
return new CompositeValuesSourceConfig(name, vs, order(), canEarlyTerminate);
return new CompositeValuesSourceConfig(name, vs, config.format(), order(), canEarlyTerminate);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
@ -68,6 +69,9 @@ import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
public class CompositeAggregatorTests extends AggregatorTestCase {
private static MappedFieldType[] FIELD_TYPES;
@ -761,6 +765,89 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
);
}
public void testWithDateHistogramAndFormat() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Arrays.asList(
createDocument("date", asLong("2017-10-20T03:08:45")),
createDocument("date", asLong("2016-09-20T09:00:34")),
createDocument("date", asLong("2016-09-20T11:34:00")),
createDocument("date", asLong("2017-10-20T06:09:24")),
createDocument("date", asLong("2017-10-19T06:09:24")),
createDocument("long", 4L)
)
);
final Sort sort = new Sort(new SortedNumericSortField("date", SortField.Type.LONG));
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
.dateHistogramInterval(DateHistogramInterval.days(1))
.format("yyyy-MM-dd");
return new CompositeAggregationBuilder("name", Collections.singletonList(histo));
},
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=2016-09-20}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=2017-10-19}", result.getBuckets().get(1).getKeyAsString());
assertEquals(1L, result.getBuckets().get(1).getDocCount());
assertEquals("{date=2017-10-20}", result.getBuckets().get(2).getKeyAsString());
assertEquals(2L, result.getBuckets().get(2).getDocCount());
}
);
testSearchCase(new MatchAllDocsQuery(), sort, dataset,
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
.dateHistogramInterval(DateHistogramInterval.days(1))
.format("yyyy-MM-dd");
return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
.aggregateAfter(createAfterKey("date", "2016-09-20"));
}, (result) -> {
assertEquals(2, result.getBuckets().size());
assertEquals("{date=2017-10-19}", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=2017-10-20}", result.getBuckets().get(1).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
}
);
}
public void testThatDateHistogramFailsFormatAfter() throws IOException {
ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class,
() -> testSearchCase(new MatchAllDocsQuery(), null, Collections.emptyList(),
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
.dateHistogramInterval(DateHistogramInterval.days(1))
.format("yyyy-MM-dd");
return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
.aggregateAfter(createAfterKey("date", "now"));
},
(result) -> {}
));
assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exc.getCause().getMessage(), containsString("now() is not supported in [after] key"));
exc = expectThrows(ElasticsearchParseException.class,
() -> testSearchCase(new MatchAllDocsQuery(), null, Collections.emptyList(),
() -> {
DateHistogramValuesSourceBuilder histo = new DateHistogramValuesSourceBuilder("date")
.field("date")
.dateHistogramInterval(DateHistogramInterval.days(1))
.format("yyyy-MM-dd");
return new CompositeAggregationBuilder("name", Collections.singletonList(histo))
.aggregateAfter(createAfterKey("date", "1474329600000"));
},
(result) -> {}
));
assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(exc.getCause().getMessage(), containsString("Parse failure"));
}
public void testWithDateHistogramAndTimeZone() throws IOException {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(

View File

@ -21,12 +21,15 @@ package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalMultiBucketAggregationTestCase;
import org.joda.time.DateTimeZone;
import org.junit.After;
import java.io.IOException;
@ -41,28 +44,45 @@ import java.util.TreeSet;
import java.util.stream.Collectors;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLengthBetween;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class InternalCompositeTests extends InternalMultiBucketAggregationTestCase<InternalComposite> {
private List<String> sourceNames;
private List<DocValueFormat> formats;
private int[] reverseMuls;
private int[] formats;
private int[] types;
private int size;
private static DocValueFormat randomDocValueFormat(boolean isLong) {
if (isLong) {
// we use specific format only for date histogram on a long/date field
if (randomBoolean()) {
return new DocValueFormat.DateTime(Joda.forPattern("epoch_second"), DateTimeZone.forOffsetHours(1));
} else {
return DocValueFormat.RAW;
}
} else {
// and the raw format for the other types
return DocValueFormat.RAW;
}
}
@Override
public void setUp() throws Exception {
super.setUp();
int numFields = randomIntBetween(1, 10);
size = randomNumberOfBuckets();
sourceNames = new ArrayList<>();
formats = new ArrayList<>();
reverseMuls = new int[numFields];
formats = new int[numFields];
types = new int[numFields];
for (int i = 0; i < numFields; i++) {
sourceNames.add("field_" + i);
reverseMuls[i] = randomBoolean() ? 1 : -1;
formats[i] = randomIntBetween(0, 2);
int type = randomIntBetween(0, 2);
types[i] = type;
formats.add(randomDocValueFormat(type == 0));
}
}
@ -70,9 +90,10 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
@After
public void tearDown() throws Exception {
super.tearDown();
sourceNames= null;
reverseMuls = null;
sourceNames = null;
formats = null;
reverseMuls = null;
types = null;
}
@Override
@ -93,7 +114,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
private CompositeKey createCompositeKey() {
Comparable<?>[] keys = new Comparable<?>[sourceNames.size()];
for (int j = 0; j < keys.length; j++) {
switch (formats[j]) {
switch (types[j]) {
case 0:
keys[j] = randomLong();
break;
@ -123,19 +144,6 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
};
}
@SuppressWarnings("unchecked")
private Comparator<InternalComposite.InternalBucket> getBucketComparator() {
return (o1, o2) -> {
for (int i = 0; i < o1.getRawKey().size(); i++) {
int cmp = ((Comparable) o1.getRawKey().get(i)).compareTo(o2.getRawKey().get(i)) * reverseMuls[i];
if (cmp != 0) {
return cmp;
}
}
return 0;
};
}
@Override
protected InternalComposite createTestInstance(String name, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, InternalAggregations aggregations) {
@ -149,11 +157,11 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
}
keys.add(key);
InternalComposite.InternalBucket bucket =
new InternalComposite.InternalBucket(sourceNames, key, reverseMuls, 1L, aggregations);
new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, 1L, aggregations);
buckets.add(bucket);
}
Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2));
return new InternalComposite(name, size, sourceNames, buckets, reverseMuls, Collections.emptyList(), metaData);
return new InternalComposite(name, size, sourceNames, formats, buckets, reverseMuls, Collections.emptyList(), metaData);
}
@Override
@ -172,7 +180,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
break;
case 1:
buckets = new ArrayList<>(buckets);
buckets.add(new InternalComposite.InternalBucket(sourceNames, createCompositeKey(), reverseMuls,
buckets.add(new InternalComposite.InternalBucket(sourceNames, formats, createCompositeKey(), reverseMuls,
randomLongBetween(1, 100), InternalAggregations.EMPTY)
);
break;
@ -187,7 +195,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa
default:
throw new AssertionError("illegal branch");
}
return new InternalComposite(instance.getName(), instance.getSize(), sourceNames, buckets, reverseMuls,
return new InternalComposite(instance.getName(), instance.getSize(), sourceNames, formats, buckets, reverseMuls,
instance.pipelineAggregators(), metaData);
}