Clean up significant terms aggregation results

* Clean up the generics around significant terms aggregation results
* Reduce code duplicated between `SignificantLongTerms` and
`SignificantStringTerms` by creating `InternalMappedSignificantTerms`
and moving common things there where possible.
* Migrate to `NamedWriteable`
* Line length fixes while I was there
This commit is contained in:
Nik Everett 2016-07-07 22:06:36 -04:00
parent 920bd0cf68
commit f479219ca7
16 changed files with 505 additions and 422 deletions

View File

@ -583,12 +583,8 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]sampler[/\\]InternalSampler.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]sampler[/\\]InternalSampler.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]sampler[/\\]SamplerAggregator.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]sampler[/\\]SamplerAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]GlobalOrdinalsSignificantTermsAggregator.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]GlobalOrdinalsSignificantTermsAggregator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]InternalSignificantTerms.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]SignificantLongTerms.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]SignificantStringTerms.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]SignificantTermsAggregatorFactory.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]SignificantTermsAggregatorFactory.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]SignificantTermsParametersParser.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]SignificantTermsParametersParser.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]UnmappedSignificantTerms.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]GND.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]GND.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]NXYSignificanceHeuristic.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]NXYSignificanceHeuristic.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]PercentageScore.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]PercentageScore.java" checks="LineLength" />
@ -1044,7 +1040,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]TermsDocCountErrorIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]TermsDocCountErrorIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]TermsShardMinDocCountIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]TermsShardMinDocCountIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]nested[/\\]NestedAggregatorTests.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]nested[/\\]NestedAggregatorTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]SignificanceHeuristicTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]AbstractGeoTestCase.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]AbstractGeoTestCase.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]AvgIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]AvgIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]SumIT.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]SumIT.java" checks="LineLength" />

View File

@ -526,12 +526,14 @@ public class SearchModule extends AbstractModule {
.addResultReader(UnmappedTerms.NAME, UnmappedTerms::new) .addResultReader(UnmappedTerms.NAME, UnmappedTerms::new)
.addResultReader(LongTerms.NAME, LongTerms::new) .addResultReader(LongTerms.NAME, LongTerms::new)
.addResultReader(DoubleTerms.NAME, DoubleTerms::new)); .addResultReader(DoubleTerms.NAME, DoubleTerms::new));
registerAggregation(SignificantTermsAggregationBuilder::new, registerAggregation(new AggregationSpec(SignificantTermsAggregationBuilder::new,
new SignificantTermsParser(significanceHeuristicParserRegistry, queryParserRegistry), new SignificantTermsParser(significanceHeuristicParserRegistry, queryParserRegistry),
SignificantTermsAggregationBuilder.AGGREGATION_NAME_FIELD); SignificantTermsAggregationBuilder.AGGREGATION_NAME_FIELD)
registerAggregation( .addResultReader(SignificantStringTerms.NAME, SignificantStringTerms::new)
new AggregationSpec(RangeAggregationBuilder::new, new RangeParser(), RangeAggregationBuilder.AGGREGATION_NAME_FIELD) .addResultReader(SignificantLongTerms.NAME, SignificantLongTerms::new)
.addResultReader(InternalRange::new)); .addResultReader(UnmappedSignificantTerms.NAME, UnmappedSignificantTerms::new));
registerAggregation(new AggregationSpec(RangeAggregationBuilder::new, new RangeParser(),
RangeAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalRange::new));
registerAggregation(new AggregationSpec(DateRangeAggregationBuilder::new, new DateRangeParser(), registerAggregation(new AggregationSpec(DateRangeAggregationBuilder::new, new DateRangeParser(),
DateRangeAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalDateRange::new)); DateRangeAggregationBuilder.AGGREGATION_NAME_FIELD).addResultReader(InternalDateRange::new));
registerAggregation(IpRangeAggregationBuilder::new, new IpRangeParser(), IpRangeAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(IpRangeAggregationBuilder::new, new IpRangeParser(), IpRangeAggregationBuilder.AGGREGATION_NAME_FIELD);
@ -818,9 +820,6 @@ public class SearchModule extends AbstractModule {
static { static {
// buckets // buckets
SignificantStringTerms.registerStreams();
SignificantLongTerms.registerStreams();
UnmappedSignificantTerms.registerStreams();
InternalGeoHashGrid.registerStreams(); InternalGeoHashGrid.registerStreams();
InternalBinaryRange.registerStream(); InternalBinaryRange.registerStream();
InternalHistogram.registerStream(); InternalHistogram.registerStream();

View File

@ -21,7 +21,7 @@ package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.util.PriorityQueue; import org.apache.lucene.util.PriorityQueue;
public class BucketSignificancePriorityQueue extends PriorityQueue<SignificantTerms.Bucket> { public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends PriorityQueue<B> {
public BucketSignificancePriorityQueue(int size) { public BucketSignificancePriorityQueue(int size) {
super(size); super(size);

View File

@ -38,10 +38,11 @@ import org.elasticsearch.search.internal.ContextIndexSearcher;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static java.util.Collections.emptyList;
/** /**
* An global ordinal based implementation of significant terms, based on {@link SignificantStringTermsAggregator}. * An global ordinal based implementation of significant terms, based on {@link SignificantStringTermsAggregator}.
*/ */
@ -94,7 +95,7 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
long supersetSize = termsAggFactory.getSupersetNumDocs(); long supersetSize = termsAggFactory.getSupersetNumDocs();
long subsetSize = numCollectedDocs; long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantStringTerms.Bucket spare = null; SignificantStringTerms.Bucket spare = null;
for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) { for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) {
if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) { if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) {
@ -123,21 +124,20 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
// Back at the central reducer these properties will be updated with // Back at the central reducer these properties will be updated with
// global stats // global stats
spare.updateScore(significanceHeuristic); spare.updateScore(significanceHeuristic);
spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare); spare = ordered.insertWithOverflow(spare);
} }
final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()]; final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) { for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = (SignificantStringTerms.Bucket) ordered.pop(); final SignificantStringTerms.Bucket bucket = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes); bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd); bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket; list[i] = bucket;
} }
return new SignificantStringTerms(subsetSize, supersetSize, name, format, bucketCountThresholds.getRequiredSize(), return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
bucketCountThresholds.getMinDocCount(), significanceHeuristic, Arrays.asList(list), pipelineAggregators(), pipelineAggregators(), metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
metaData());
} }
@Override @Override
@ -146,9 +146,8 @@ public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStri
ContextIndexSearcher searcher = context.searchContext().searcher(); ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader(); IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs(); int supersetSize = topReader.numDocs();
return new SignificantStringTerms(0, supersetSize, name, format, bucketCountThresholds.getRequiredSize(), return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
bucketCountThresholds.getMinDocCount(), significanceHeuristic, pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
Collections.<InternalSignificantTerms.Bucket> emptyList(), pipelineAggregators(), metaData());
} }
@Override @Override

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public abstract class InternalMappedSignificantTerms<
A extends InternalMappedSignificantTerms<A, B>,
B extends InternalSignificantTerms.Bucket<B>>
extends InternalSignificantTerms<A, B> {
protected final DocValueFormat format;
protected final long subsetSize;
protected final long supersetSize;
protected final SignificanceHeuristic significanceHeuristic;
protected final List<B> buckets;
protected Map<String, B> bucketMap;
protected InternalMappedSignificantTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, DocValueFormat format, long subsetSize, long supersetSize,
SignificanceHeuristic significanceHeuristic, List<B> buckets) {
super(name, requiredSize, minDocCount, pipelineAggregators, metaData);
this.format = format;
this.buckets = buckets;
this.subsetSize = subsetSize;
this.supersetSize = supersetSize;
this.significanceHeuristic = significanceHeuristic;
}
protected InternalMappedSignificantTerms(StreamInput in, Bucket.Reader<B> bucketReader) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
subsetSize = in.readVLong();
supersetSize = in.readVLong();
significanceHeuristic = in.readNamedWriteable(SignificanceHeuristic.class);
buckets = in.readList(stream -> bucketReader.read(stream, subsetSize, supersetSize, format));
}
@Override
protected final void writeTermTypeInfoTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeVLong(subsetSize);
out.writeVLong(supersetSize);
out.writeNamedWriteable(significanceHeuristic);
out.writeList(buckets);
}
@Override
protected List<B> getBucketsInternal() {
return buckets;
}
@Override
public B getBucketByKey(String term) {
if (bucketMap == null) {
bucketMap = buckets.stream().collect(Collectors.toMap(InternalSignificantTerms.Bucket::getKeyAsString, Function.identity()));
}
return bucketMap.get(term);
}
@Override
protected long getSubsetSize() {
return subsetSize;
}
@Override
protected long getSupersetSize() {
return supersetSize;
}
@Override
protected SignificanceHeuristic getSignificanceHeuristic() {
return significanceHeuristic;
}
}

View File

@ -18,7 +18,8 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.significant; package org.elasticsearch.search.aggregations.bucket.significant;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
@ -28,44 +29,36 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import static java.util.Collections.unmodifiableList;
/** /**
* * Result of the significant terms aggregation.
*/ */
public abstract class InternalSignificantTerms<A extends InternalSignificantTerms, B extends InternalSignificantTerms.Bucket> extends public abstract class InternalSignificantTerms<A extends InternalSignificantTerms<A, B>, B extends InternalSignificantTerms.Bucket<B>>
InternalMultiBucketAggregation<A, B> implements SignificantTerms, ToXContent, Streamable { extends InternalMultiBucketAggregation<A, B> implements SignificantTerms, ToXContent {
protected SignificanceHeuristic significanceHeuristic;
protected int requiredSize;
protected long minDocCount;
protected List<? extends Bucket> buckets;
protected Map<String, Bucket> bucketMap;
protected long subsetSize;
protected long supersetSize;
protected InternalSignificantTerms() {} // for serialization
@SuppressWarnings("PMD.ConstructorCallsOverridableMethod") @SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
public abstract static class Bucket extends SignificantTerms.Bucket { public abstract static class Bucket<B extends Bucket<B>> extends SignificantTerms.Bucket {
/**
* Reads a bucket. Should be a constructor reference.
*/
@FunctionalInterface
public interface Reader<B extends Bucket<B>> {
B read(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) throws IOException;
}
long bucketOrd; long bucketOrd;
protected InternalAggregations aggregations; protected InternalAggregations aggregations;
double score; double score;
final transient DocValueFormat format; final transient DocValueFormat format;
protected Bucket(long subsetSize, long supersetSize, DocValueFormat format) {
// for serialization
super(subsetSize, supersetSize);
this.format = format;
}
protected Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, protected Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, DocValueFormat format) { InternalAggregations aggregations, DocValueFormat format) {
super(subsetDf, subsetSize, supersetDf, supersetSize); super(subsetDf, subsetSize, supersetDf, supersetSize);
@ -73,6 +66,14 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
this.format = format; this.format = format;
} }
/**
* Read from a stream.
*/
protected Bucket(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) {
super(in, subsetSize, supersetSize);
this.format = format;
}
@Override @Override
public long getSubsetDf() { public long getSubsetDf() {
return subsetDf; return subsetDf;
@ -107,11 +108,11 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
return aggregations; return aggregations;
} }
public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) { public B reduce(List<B> buckets, ReduceContext context) {
long subsetDf = 0; long subsetDf = 0;
long supersetDf = 0; long supersetDf = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size()); List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (Bucket bucket : buckets) { for (B bucket : buckets) {
subsetDf += bucket.subsetDf; subsetDf += bucket.subsetDf;
supersetDf += bucket.supersetDf; supersetDf += bucket.supersetDf;
aggregationsList.add(bucket.aggregations); aggregationsList.add(bucket.aggregations);
@ -120,7 +121,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs); return newBucket(subsetDf, subsetSize, supersetDf, supersetSize, aggs);
} }
abstract Bucket newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations); abstract B newBucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations);
@Override @Override
public double getSignificanceScore() { public double getSignificanceScore() {
@ -128,90 +129,102 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
} }
} }
protected DocValueFormat format; protected final int requiredSize;
protected final long minDocCount;
protected InternalSignificantTerms(long subsetSize, long supersetSize, String name, DocValueFormat format, int requiredSize, protected InternalSignificantTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
long minDocCount, SignificanceHeuristic significanceHeuristic, List<? extends Bucket> buckets, Map<String, Object> metaData) {
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData); super(name, pipelineAggregators, metaData);
this.requiredSize = requiredSize; this.requiredSize = requiredSize;
this.minDocCount = minDocCount; this.minDocCount = minDocCount;
this.buckets = buckets;
this.subsetSize = subsetSize;
this.supersetSize = supersetSize;
this.significanceHeuristic = significanceHeuristic;
this.format = Objects.requireNonNull(format);
} }
/**
* Read from a stream.
*/
protected InternalSignificantTerms(StreamInput in) throws IOException {
super(in);
requiredSize = readSize(in);
minDocCount = in.readVLong();
}
protected final void doWriteTo(StreamOutput out) throws IOException {
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
writeTermTypeInfoTo(out);
}
protected abstract void writeTermTypeInfoTo(StreamOutput out) throws IOException;
@Override @Override
public Iterator<SignificantTerms.Bucket> iterator() { public Iterator<SignificantTerms.Bucket> iterator() {
Object o = buckets.iterator(); return getBuckets().iterator();
return (Iterator<SignificantTerms.Bucket>) o;
} }
@Override @Override
public List<SignificantTerms.Bucket> getBuckets() { public List<SignificantTerms.Bucket> getBuckets() {
Object o = buckets; return unmodifiableList(getBucketsInternal());
return (List<SignificantTerms.Bucket>) o;
} }
@Override protected abstract List<B> getBucketsInternal();
public SignificantTerms.Bucket getBucketByKey(String term) {
if (bucketMap == null) {
bucketMap = new HashMap<>(buckets.size());
for (Bucket bucket : buckets) {
bucketMap.put(bucket.getKeyAsString(), bucket);
}
}
return bucketMap.get(term);
}
@Override @Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) { public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long globalSubsetSize = 0; long globalSubsetSize = 0;
long globalSupersetSize = 0; long globalSupersetSize = 0;
// Compute the overall result set size and the corpus size using the // Compute the overall result set size and the corpus size using the
// top-level Aggregations from each shard // top-level Aggregations from each shard
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation; InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
globalSubsetSize += terms.subsetSize; globalSubsetSize += terms.getSubsetSize();
globalSupersetSize += terms.supersetSize; globalSupersetSize += terms.getSupersetSize();
} }
Map<String, List<InternalSignificantTerms.Bucket>> buckets = new HashMap<>(); Map<String, List<B>> buckets = new HashMap<>();
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation; InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
for (Bucket bucket : terms.buckets) { for (B bucket : terms.getBucketsInternal()) {
List<Bucket> existingBuckets = buckets.get(bucket.getKeyAsString()); List<B> existingBuckets = buckets.get(bucket.getKeyAsString());
if (existingBuckets == null) { if (existingBuckets == null) {
existingBuckets = new ArrayList<>(aggregations.size()); existingBuckets = new ArrayList<>(aggregations.size());
buckets.put(bucket.getKeyAsString(), existingBuckets); buckets.put(bucket.getKeyAsString(), existingBuckets);
} }
// Adjust the buckets with the global stats representing the // Adjust the buckets with the global stats representing the
// total size of the pots from which the stats are drawn // total size of the pots from which the stats are drawn
existingBuckets.add(bucket.newBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize, bucket.aggregations)); existingBuckets.add(bucket.newBucket(bucket.getSubsetDf(), globalSubsetSize, bucket.getSupersetDf(), globalSupersetSize,
bucket.aggregations));
} }
} }
significanceHeuristic.initialize(reduceContext); getSignificanceHeuristic().initialize(reduceContext);
final int size = Math.min(requiredSize, buckets.size()); final int size = Math.min(requiredSize, buckets.size());
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); BucketSignificancePriorityQueue<B> ordered = new BucketSignificancePriorityQueue<>(size);
for (Map.Entry<String, List<Bucket>> entry : buckets.entrySet()) { for (Map.Entry<String, List<B>> entry : buckets.entrySet()) {
List<Bucket> sameTermBuckets = entry.getValue(); List<B> sameTermBuckets = entry.getValue();
final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
b.updateScore(significanceHeuristic); b.updateScore(getSignificanceHeuristic());
if ((b.score > 0) && (b.subsetDf >= minDocCount)) { if ((b.score > 0) && (b.subsetDf >= minDocCount)) {
ordered.insertWithOverflow(b); ordered.insertWithOverflow(b);
} }
} }
Bucket[] list = new Bucket[ordered.size()]; B[] list = createBucketsArray(ordered.size());
for (int i = ordered.size() - 1; i >= 0; i--) { for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = (Bucket) ordered.pop(); list[i] = ordered.pop();
} }
return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list), this); return create(globalSubsetSize, globalSupersetSize, Arrays.asList(list));
} }
protected abstract A create(long subsetSize, long supersetSize, List<InternalSignificantTerms.Bucket> buckets, protected abstract A create(long subsetSize, long supersetSize, List<B> buckets);
InternalSignificantTerms prototype);
/**
* Create an array to hold some buckets. Used in collecting the results.
*/
protected abstract B[] createBucketsArray(int size);
protected abstract long getSubsetSize();
protected abstract long getSupersetSize();
protected abstract SignificanceHeuristic getSignificanceHeuristic();
} }

View File

@ -22,56 +22,54 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* * Result of the running the significant terms aggregation on a numeric field.
*/ */
public class SignificantLongTerms extends InternalSignificantTerms<SignificantLongTerms, SignificantLongTerms.Bucket> { public class SignificantLongTerms extends InternalMappedSignificantTerms<SignificantLongTerms, SignificantLongTerms.Bucket> {
public static final String NAME = "siglterms";
public static final Type TYPE = new Type("significant_terms", "siglterms"); static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public SignificantLongTerms readResult(StreamInput in) throws IOException {
SignificantLongTerms buckets = new SignificantLongTerms();
buckets.readFrom(in);
return buckets;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
static class Bucket extends InternalSignificantTerms.Bucket {
long term; long term;
public Bucket(long subsetSize, long supersetSize, DocValueFormat format) {
super(subsetSize, supersetSize, format);
// for serialization
}
public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations, public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
DocValueFormat format) { DocValueFormat format) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.term = term; this.term = term;
} }
public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations, double score) { public Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize, long term, InternalAggregations aggregations,
double score) {
this(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, null); this(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, null);
this.score = score; this.score = score;
} }
public Bucket(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) throws IOException {
super(in, subsetSize, supersetSize, format);
subsetDf = in.readVLong();
supersetDf = in.readVLong();
term = in.readLong();
score = in.readDouble();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(subsetDf);
out.writeVLong(supersetDf);
out.writeLong(term);
out.writeDouble(getSignificanceScore());
aggregations.writeTo(out);
}
@Override @Override
public Object getKey() { public Object getKey() {
return term; return term;
@ -84,7 +82,7 @@ public class SignificantLongTerms extends InternalSignificantTerms<SignificantLo
@Override @Override
public String getKeyAsString() { public String getKeyAsString() {
return Long.toString(term); return format.format(term);
} }
@Override @Override
@ -97,25 +95,6 @@ public class SignificantLongTerms extends InternalSignificantTerms<SignificantLo
return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, format); return new Bucket(subsetDf, subsetSize, supersetDf, supersetSize, term, aggregations, format);
} }
@Override
public void readFrom(StreamInput in) throws IOException {
subsetDf = in.readVLong();
supersetDf = in.readVLong();
term = in.readLong();
score = in.readDouble();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(subsetDf);
out.writeVLong(supersetDf);
out.writeLong(term);
out.writeDouble(getSignificanceScore());
aggregations.writeTo(out);
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -132,25 +111,29 @@ public class SignificantLongTerms extends InternalSignificantTerms<SignificantLo
} }
} }
SignificantLongTerms() { public SignificantLongTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
} // for serialization Map<String, Object> metaData, DocValueFormat format, long subsetSize, long supersetSize,
SignificanceHeuristic significanceHeuristic, List<Bucket> buckets) {
super(name, requiredSize, minDocCount, pipelineAggregators, metaData, format, subsetSize, supersetSize, significanceHeuristic,
buckets);
}
public SignificantLongTerms(long subsetSize, long supersetSize, String name, DocValueFormat format, int requiredSize, /**
long minDocCount, SignificanceHeuristic significanceHeuristic, List<? extends InternalSignificantTerms.Bucket> buckets, * Read from a stream.
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { */
public SignificantLongTerms(StreamInput in) throws IOException {
super(subsetSize, supersetSize, name, format, requiredSize, minDocCount, significanceHeuristic, buckets, pipelineAggregators, metaData); super(in, Bucket::new);
} }
@Override @Override
public Type type() { public String getWriteableName() {
return TYPE; return NAME;
} }
@Override @Override
public SignificantLongTerms create(List<SignificantLongTerms.Bucket> buckets) { public SignificantLongTerms create(List<SignificantLongTerms.Bucket> buckets) {
return new SignificantLongTerms(this.subsetSize, this.supersetSize, this.name, this.format, this.requiredSize, this.minDocCount, return new SignificantLongTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData, format, subsetSize, supersetSize,
this.significanceHeuristic, buckets, this.pipelineAggregators(), this.metaData); significanceHeuristic, buckets);
} }
@Override @Override
@ -160,59 +143,24 @@ public class SignificantLongTerms extends InternalSignificantTerms<SignificantLo
} }
@Override @Override
protected SignificantLongTerms create(long subsetSize, long supersetSize, protected SignificantLongTerms create(long subsetSize, long supersetSize, List<Bucket> buckets) {
List<org.elasticsearch.search.aggregations.bucket.significant.InternalSignificantTerms.Bucket> buckets, return new SignificantLongTerms(getName(), requiredSize, minDocCount, pipelineAggregators(), getMetaData(), format, subsetSize,
InternalSignificantTerms prototype) { supersetSize, significanceHeuristic, buckets);
return new SignificantLongTerms(subsetSize, supersetSize, prototype.getName(), ((SignificantLongTerms) prototype).format,
prototype.requiredSize, prototype.minDocCount, prototype.significanceHeuristic, buckets, prototype.pipelineAggregators(),
prototype.getMetaData());
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
this.format = in.readNamedWriteable(DocValueFormat.class);
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
this.subsetSize = in.readVLong();
this.supersetSize = in.readVLong();
significanceHeuristic = in.readNamedWriteable(SignificanceHeuristic.class);
int size = in.readVInt();
List<InternalSignificantTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Bucket bucket = new Bucket(subsetSize, supersetSize, format);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVLong(subsetSize);
out.writeVLong(supersetSize);
out.writeNamedWriteable(significanceHeuristic);
out.writeVInt(buckets.size());
for (InternalSignificantTerms.Bucket bucket : buckets) {
bucket.writeTo(out);
}
} }
@Override @Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field("doc_count", subsetSize); builder.field("doc_count", subsetSize);
builder.startArray(CommonFields.BUCKETS); builder.startArray(CommonFields.BUCKETS);
for (InternalSignificantTerms.Bucket bucket : buckets) { for (Bucket bucket : buckets) {
bucket.toXContent(builder, params); bucket.toXContent(builder, params);
} }
builder.endArray(); builder.endArray();
return builder; return builder;
} }
@Override
protected Bucket[] createBucketsArray(int size) {
return new Bucket[size];
}
} }

View File

@ -36,10 +36,11 @@ import org.elasticsearch.search.internal.ContextIndexSearcher;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static java.util.Collections.emptyList;
/** /**
* *
*/ */
@ -82,7 +83,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
long supersetSize = termsAggFactory.getSupersetNumDocs(); long supersetSize = termsAggFactory.getSupersetNumDocs();
long subsetSize = numCollectedDocs; long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); BucketSignificancePriorityQueue<SignificantLongTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantLongTerms.Bucket spare = null; SignificantLongTerms.Bucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) { for (long i = 0; i < bucketOrds.size(); i++) {
final int docCount = bucketDocCount(i); final int docCount = bucketDocCount(i);
@ -102,18 +103,17 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
spare.updateScore(significanceHeuristic); spare.updateScore(significanceHeuristic);
spare.bucketOrd = i; spare.bucketOrd = i;
spare = (SignificantLongTerms.Bucket) ordered.insertWithOverflow(spare); spare = ordered.insertWithOverflow(spare);
} }
final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()]; final SignificantLongTerms.Bucket[] list = new SignificantLongTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) { for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantLongTerms.Bucket bucket = (SignificantLongTerms.Bucket) ordered.pop(); final SignificantLongTerms.Bucket bucket = ordered.pop();
bucket.aggregations = bucketAggregations(bucket.bucketOrd); bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket; list[i] = bucket;
} }
return new SignificantLongTerms(subsetSize, supersetSize, name, format, bucketCountThresholds.getRequiredSize(), return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
bucketCountThresholds.getMinDocCount(), significanceHeuristic, Arrays.asList(list), pipelineAggregators(), pipelineAggregators(), metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
metaData());
} }
@Override @Override
@ -122,9 +122,8 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
ContextIndexSearcher searcher = context.searchContext().searcher(); ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader(); IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs(); int supersetSize = topReader.numDocs();
return new SignificantLongTerms(0, supersetSize, name, format, bucketCountThresholds.getRequiredSize(), return new SignificantLongTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
bucketCountThresholds.getMinDocCount(), significanceHeuristic, pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
Collections.<InternalSignificantTerms.Bucket> emptyList(), pipelineAggregators(), metaData());
} }
@Override @Override

View File

@ -23,56 +23,51 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* * Result of the running the significant terms aggregation on a String field.
*/ */
public class SignificantStringTerms extends InternalSignificantTerms<SignificantStringTerms, SignificantStringTerms.Bucket> { public class SignificantStringTerms extends InternalMappedSignificantTerms<SignificantStringTerms, SignificantStringTerms.Bucket> {
public static final String NAME = "sigsterms";
public static final InternalAggregation.Type TYPE = new Type("significant_terms", "sigsterms"); public static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public SignificantStringTerms readResult(StreamInput in) throws IOException {
SignificantStringTerms buckets = new SignificantStringTerms();
buckets.readFrom(in);
return buckets;
}
};
public static void registerStream() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
public static class Bucket extends InternalSignificantTerms.Bucket {
BytesRef termBytes; BytesRef termBytes;
public Bucket(long subsetSize, long supersetSize, DocValueFormat format) {
// for serialization
super(subsetSize, supersetSize, format);
}
public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations, public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations,
DocValueFormat format) { DocValueFormat format) {
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
this.termBytes = term; this.termBytes = term;
} }
/**
* Read from a stream.
*/
public Bucket(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) throws IOException {
super(in, subsetSize, supersetSize, format);
termBytes = in.readBytesRef();
subsetDf = in.readVLong();
supersetDf = in.readVLong();
score = in.readDouble();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesRef(termBytes);
out.writeVLong(subsetDf);
out.writeVLong(supersetDf);
out.writeDouble(getSignificanceScore());
aggregations.writeTo(out);
}
public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize,
InternalAggregations aggregations, double score, DocValueFormat format) { InternalAggregations aggregations, double score, DocValueFormat format) {
this(term, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); this(term, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
@ -105,24 +100,6 @@ public class SignificantStringTerms extends InternalSignificantTerms<Significant
return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format); return new Bucket(termBytes, subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
} }
@Override
public void readFrom(StreamInput in) throws IOException {
termBytes = in.readBytesRef();
subsetDf = in.readVLong();
supersetDf = in.readVLong();
score = in.readDouble();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesRef(termBytes);
out.writeVLong(subsetDf);
out.writeVLong(supersetDf);
out.writeDouble(getSignificanceScore());
aggregations.writeTo(out);
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
@ -136,24 +113,29 @@ public class SignificantStringTerms extends InternalSignificantTerms<Significant
} }
} }
SignificantStringTerms() {} // for serialization public SignificantStringTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData, DocValueFormat format, long subsetSize, long supersetSize,
SignificanceHeuristic significanceHeuristic, List<Bucket> buckets) {
super(name, requiredSize, minDocCount, pipelineAggregators, metaData, format, subsetSize, supersetSize, significanceHeuristic,
buckets);
}
public SignificantStringTerms(long subsetSize, long supersetSize, String name, DocValueFormat format, int requiredSize, /**
long minDocCount, SignificanceHeuristic significanceHeuristic, List<? extends InternalSignificantTerms.Bucket> buckets, * Read from a stream.
List<PipelineAggregator> pipelineAggregators, */
Map<String, Object> metaData) { public SignificantStringTerms(StreamInput in) throws IOException {
super(subsetSize, supersetSize, name, format, requiredSize, minDocCount, significanceHeuristic, buckets, pipelineAggregators, metaData); super(in, Bucket::new);
} }
@Override @Override
public Type type() { public String getWriteableName() {
return TYPE; return NAME;
} }
@Override @Override
public SignificantStringTerms create(List<SignificantStringTerms.Bucket> buckets) { public SignificantStringTerms create(List<SignificantStringTerms.Bucket> buckets) {
return new SignificantStringTerms(this.subsetSize, this.supersetSize, this.name, this.format, this.requiredSize, this.minDocCount, return new SignificantStringTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData, format, subsetSize,
this.significanceHeuristic, buckets, this.pipelineAggregators(), this.metaData); supersetSize, significanceHeuristic, buckets);
} }
@Override @Override
@ -163,50 +145,16 @@ public class SignificantStringTerms extends InternalSignificantTerms<Significant
} }
@Override @Override
protected SignificantStringTerms create(long subsetSize, long supersetSize, List<InternalSignificantTerms.Bucket> buckets, protected SignificantStringTerms create(long subsetSize, long supersetSize, List<Bucket> buckets) {
InternalSignificantTerms prototype) { return new SignificantStringTerms(getName(), requiredSize, minDocCount, pipelineAggregators(), getMetaData(), format, subsetSize,
return new SignificantStringTerms(subsetSize, supersetSize, prototype.getName(), prototype.format, prototype.requiredSize, supersetSize, significanceHeuristic, buckets);
prototype.minDocCount, prototype.significanceHeuristic, buckets, prototype.pipelineAggregators(), prototype.getMetaData());
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
this.format = in.readNamedWriteable(DocValueFormat.class);
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
this.subsetSize = in.readVLong();
this.supersetSize = in.readVLong();
significanceHeuristic = in.readNamedWriteable(SignificanceHeuristic.class);
int size = in.readVInt();
List<InternalSignificantTerms.Bucket> buckets = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Bucket bucket = new Bucket(subsetSize, supersetSize, format);
bucket.readFrom(in);
buckets.add(bucket);
}
this.buckets = buckets;
this.bucketMap = null;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
out.writeVLong(subsetSize);
out.writeVLong(supersetSize);
out.writeNamedWriteable(significanceHeuristic);
out.writeVInt(buckets.size());
for (InternalSignificantTerms.Bucket bucket : buckets) {
bucket.writeTo(out);
}
} }
@Override @Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field("doc_count", subsetSize); builder.field("doc_count", subsetSize);
builder.startArray(CommonFields.BUCKETS); builder.startArray(CommonFields.BUCKETS);
for (InternalSignificantTerms.Bucket bucket : buckets) { for (Bucket bucket : buckets) {
//There is a condition (presumably when only one shard has a bucket?) where reduce is not called //There is a condition (presumably when only one shard has a bucket?) where reduce is not called
// and I end up with buckets that contravene the user's min_doc_count criteria in my reducer // and I end up with buckets that contravene the user's min_doc_count criteria in my reducer
if (bucket.subsetDf >= minDocCount) { if (bucket.subsetDf >= minDocCount) {
@ -217,4 +165,8 @@ public class SignificantStringTerms extends InternalSignificantTerms<Significant
return builder; return builder;
} }
@Override
protected Bucket[] createBucketsArray(int size) {
return new Bucket[size];
}
} }

View File

@ -37,10 +37,11 @@ import org.elasticsearch.search.internal.ContextIndexSearcher;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static java.util.Collections.emptyList;
/** /**
* An aggregator of significant string values. * An aggregator of significant string values.
*/ */
@ -81,7 +82,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
long supersetSize = termsAggFactory.getSupersetNumDocs(); long supersetSize = termsAggFactory.getSupersetNumDocs();
long subsetSize = numCollectedDocs; long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size); BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantStringTerms.Bucket spare = null; SignificantStringTerms.Bucket spare = null;
for (int i = 0; i < bucketOrds.size(); i++) { for (int i = 0; i < bucketOrds.size(); i++) {
final int docCount = bucketDocCount(i); final int docCount = bucketDocCount(i);
@ -105,21 +106,21 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
spare.updateScore(significanceHeuristic); spare.updateScore(significanceHeuristic);
spare.bucketOrd = i; spare.bucketOrd = i;
spare = (SignificantStringTerms.Bucket) ordered.insertWithOverflow(spare); spare = ordered.insertWithOverflow(spare);
} }
final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()]; final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) { for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = (SignificantStringTerms.Bucket) ordered.pop(); final SignificantStringTerms.Bucket bucket = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point // the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes); bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd); bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket; list[i] = bucket;
} }
return new SignificantStringTerms(subsetSize, supersetSize, name, format, bucketCountThresholds.getRequiredSize(), return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), significanceHeuristic, Arrays.asList(list), pipelineAggregators(), bucketCountThresholds.getMinDocCount(), pipelineAggregators(),
metaData()); metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
} }
@Override @Override
@ -128,9 +129,8 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
ContextIndexSearcher searcher = context.searchContext().searcher(); ContextIndexSearcher searcher = context.searchContext().searcher();
IndexReader topReader = searcher.getIndexReader(); IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs(); int supersetSize = topReader.numDocs();
return new SignificantStringTerms(0, supersetSize, name, format, bucketCountThresholds.getRequiredSize(), return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
bucketCountThresholds.getMinDocCount(), significanceHeuristic, pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
Collections.<InternalSignificantTerms.Bucket> emptyList(), pipelineAggregators(), metaData());
} }
@Override @Override

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.significant; package org.elasticsearch.search.aggregations.bucket.significant;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
@ -27,8 +28,6 @@ import java.util.List;
* An aggregation that collects significant terms in comparison to a background set. * An aggregation that collects significant terms in comparison to a background set.
*/ */
public interface SignificantTerms extends MultiBucketsAggregation, Iterable<SignificantTerms.Bucket> { public interface SignificantTerms extends MultiBucketsAggregation, Iterable<SignificantTerms.Bucket> {
abstract static class Bucket extends InternalMultiBucketAggregation.InternalBucket { abstract static class Bucket extends InternalMultiBucketAggregation.InternalBucket {
long subsetDf; long subsetDf;
@ -36,18 +35,21 @@ public interface SignificantTerms extends MultiBucketsAggregation, Iterable<Sign
long supersetDf; long supersetDf;
long supersetSize; long supersetSize;
protected Bucket(long subsetSize, long supersetSize) { Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize) {
// for serialization
this.subsetSize = subsetSize; this.subsetSize = subsetSize;
this.supersetSize = supersetSize; this.supersetSize = supersetSize;
}
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize) {
this(subsetSize, supersetSize);
this.subsetDf = subsetDf; this.subsetDf = subsetDf;
this.supersetDf = supersetDf; this.supersetDf = supersetDf;
} }
/**
* Read from a stream.
*/
protected Bucket(StreamInput in, long subsetSize, long supersetSize) {
this.subsetSize = subsetSize;
this.supersetSize = supersetSize;
}
abstract int compareTerm(SignificantTerms.Bucket other); abstract int compareTerm(SignificantTerms.Bucket other);
public abstract double getSignificanceScore(); public abstract double getSignificanceScore();

View File

@ -24,7 +24,9 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.JLHScore; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.JLHScore;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
@ -46,7 +48,8 @@ import java.util.Objects;
* *
*/ */
public class SignificantTermsAggregationBuilder extends ValuesSourceAggregationBuilder<ValuesSource, SignificantTermsAggregationBuilder> { public class SignificantTermsAggregationBuilder extends ValuesSourceAggregationBuilder<ValuesSource, SignificantTermsAggregationBuilder> {
public static final String NAME = SignificantStringTerms.TYPE.name(); public static final String NAME = "significant_terms";
public static final InternalAggregation.Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
static final ParseField BACKGROUND_FILTER = new ParseField("background_filter"); static final ParseField BACKGROUND_FILTER = new ParseField("background_filter");
@ -63,14 +66,14 @@ public class SignificantTermsAggregationBuilder extends ValuesSourceAggregationB
private SignificanceHeuristic significanceHeuristic = DEFAULT_SIGNIFICANCE_HEURISTIC; private SignificanceHeuristic significanceHeuristic = DEFAULT_SIGNIFICANCE_HEURISTIC;
public SignificantTermsAggregationBuilder(String name, ValueType valueType) { public SignificantTermsAggregationBuilder(String name, ValueType valueType) {
super(name, SignificantStringTerms.TYPE, ValuesSourceType.ANY, valueType); super(name, TYPE, ValuesSourceType.ANY, valueType);
} }
/** /**
* Read from a Stream. * Read from a Stream.
*/ */
public SignificantTermsAggregationBuilder(StreamInput in) throws IOException { public SignificantTermsAggregationBuilder(StreamInput in) throws IOException {
super(in, SignificantStringTerms.TYPE, ValuesSourceType.ANY); super(in, TYPE, ValuesSourceType.ANY);
bucketCountThresholds = new BucketCountThresholds(in); bucketCountThresholds = new BucketCountThresholds(in);
executionHint = in.readOptionalString(); executionHint = in.readOptionalString();
filterBuilder = in.readOptionalNamedWriteable(QueryBuilder.class); filterBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);

View File

@ -18,69 +18,75 @@
*/ */
package org.elasticsearch.search.aggregations.bucket.significant; package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
import org.elasticsearch.search.aggregations.bucket.terms.UnmappedTerms;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static java.util.Collections.emptyList;
/** /**
* * Result of the running the significant terms aggregation on an unmapped field.
*/ */
public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedSignificantTerms, InternalSignificantTerms.Bucket> { public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedSignificantTerms, UnmappedSignificantTerms.Bucket> {
public static final String NAME = "umsigterms";
public static final Type TYPE = new Type("significant_terms", "umsigterms"); /**
* Concrete type that can't be built because Java needs a concrent type so {@link InternalTerms.Bucket} can have a self type but
private static final List<Bucket> BUCKETS = Collections.emptyList(); * {@linkplain UnmappedTerms} doesn't ever need to build it because it never returns any buckets.
private static final Map<String, Bucket> BUCKETS_MAP = Collections.emptyMap(); */
protected abstract static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { private Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations,
@Override DocValueFormat format) {
public UnmappedSignificantTerms readResult(StreamInput in) throws IOException { super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations, format);
UnmappedSignificantTerms buckets = new UnmappedSignificantTerms();
buckets.readFrom(in);
return buckets;
} }
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
} }
UnmappedSignificantTerms() {} // for serialization public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, requiredSize, minDocCount, pipelineAggregators, metaData);
}
public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { /**
//We pass zero for index/subset sizes because for the purpose of significant term analysis * Read from a stream.
// we assume an unmapped index's size is irrelevant to the proceedings. */
super(0, 0, name, DocValueFormat.RAW, requiredSize, minDocCount, SignificantTermsAggregationBuilder.DEFAULT_SIGNIFICANCE_HEURISTIC, public UnmappedSignificantTerms(StreamInput in) throws IOException {
BUCKETS, pipelineAggregators, metaData); super(in);
} }
@Override @Override
public Type type() { protected void writeTermTypeInfoTo(StreamOutput out) throws IOException {
return TYPE; // Nothing to write
} }
@Override @Override
public UnmappedSignificantTerms create(List<InternalSignificantTerms.Bucket> buckets) { public String getWriteableName() {
return new UnmappedSignificantTerms(this.name, this.requiredSize, this.minDocCount, this.pipelineAggregators(), this.metaData); return NAME;
} }
@Override @Override
public InternalSignificantTerms.Bucket createBucket(InternalAggregations aggregations, InternalSignificantTerms.Bucket prototype) { public UnmappedSignificantTerms create(List<Bucket> buckets) {
return new UnmappedSignificantTerms(name, requiredSize, minDocCount, pipelineAggregators(), metaData);
}
@Override
public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) {
throw new UnsupportedOperationException("not supported for UnmappedSignificantTerms"); throw new UnsupportedOperationException("not supported for UnmappedSignificantTerms");
} }
@Override @Override
protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, List<Bucket> buckets, InternalSignificantTerms prototype) { protected UnmappedSignificantTerms create(long subsetSize, long supersetSize, List<Bucket> buckets) {
throw new UnsupportedOperationException("not supported for UnmappedSignificantTerms"); throw new UnsupportedOperationException("not supported for UnmappedSignificantTerms");
} }
@ -94,24 +100,39 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedS
return this; return this;
} }
@Override
protected void doReadFrom(StreamInput in) throws IOException {
this.requiredSize = readSize(in);
this.minDocCount = in.readVLong();
this.buckets = BUCKETS;
this.bucketMap = BUCKETS_MAP;
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
writeSize(requiredSize, out);
out.writeVLong(minDocCount);
}
@Override @Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS).endArray(); builder.startArray(CommonFields.BUCKETS).endArray();
return builder; return builder;
} }
@Override
protected Bucket[] createBucketsArray(int size) {
return new Bucket[size];
}
@Override
protected List<Bucket> getBucketsInternal() {
return emptyList();
}
@Override
public SignificantTerms.Bucket getBucketByKey(String term) {
return null;
}
@Override
protected SignificanceHeuristic getSignificanceHeuristic() {
throw new UnsupportedOperationException();
}
@Override
protected long getSubsetSize() {
return 0;
}
@Override
protected long getSupersetSize() {
return 0;
}
} }

View File

@ -63,6 +63,7 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
@Override @Override
protected void writeTermTypeInfoTo(StreamOutput out) throws IOException { protected void writeTermTypeInfoTo(StreamOutput out) throws IOException {
// Nothing to write
} }
@Override @Override

View File

@ -41,18 +41,22 @@ import org.elasticsearch.test.ESIntegTestCase;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Locale;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms; import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsNull.notNullValue;
/** /**
* *
@ -385,6 +389,27 @@ public class SignificantTermsIT extends ESIntegTestCase {
checkExpectedStringTermsFound(topTerms); checkExpectedStringTermsFound(topTerms);
} }
public void testPartiallyUnmappedWithFormat() throws Exception {
SearchResponse response = client().prepareSearch("idx_unmapped", "test")
.setSearchType(SearchType.QUERY_AND_FETCH)
.setQuery(boolQuery().should(termQuery("_all", "the")).should(termQuery("_all", "terje")))
.setFrom(0).setSize(60).setExplain(true)
.addAggregation(significantTerms("mySignificantTerms")
.field("fact_category")
.executionHint(randomExecutionHint())
.minDocCount(1)
.format("0000"))
.execute()
.actionGet();
assertSearchResponse(response);
SignificantTerms topTerms = response.getAggregations().get("mySignificantTerms");
for (int i = 1; i <= 3; i++) {
String key = String.format(Locale.ROOT, "%04d", i);
SignificantTerms.Bucket bucket = topTerms.getBucketByKey(key);
assertThat(bucket, notNullValue());
assertThat(bucket.getKeyAsString(), equalTo(key));
}
}
private void checkExpectedStringTermsFound(SignificantTerms topTerms) { private void checkExpectedStringTermsFound(SignificantTerms topTerms) {
HashMap<String,Bucket>topWords=new HashMap<>(); HashMap<String,Bucket>topWords=new HashMap<>();

View File

@ -58,22 +58,23 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms; import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms;
import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
/**
*
*/
public class SignificanceHeuristicTests extends ESTestCase { public class SignificanceHeuristicTests extends ESTestCase {
static class SignificantTermsTestSearchContext extends TestSearchContext { static class SignificantTermsTestSearchContext extends TestSearchContext {
@ -95,13 +96,13 @@ public class SignificanceHeuristicTests extends ESTestCase {
// test that stream output can actually be read - does not replace bwc test // test that stream output can actually be read - does not replace bwc test
public void testStreamResponse() throws Exception { public void testStreamResponse() throws Exception {
Version version = randomVersion(random()); Version version = randomVersion(random());
InternalSignificantTerms[] sigTerms = getRandomSignificantTerms(getRandomSignificanceheuristic()); InternalMappedSignificantTerms<?, ?> sigTerms = getRandomSignificantTerms(getRandomSignificanceheuristic());
// write // write
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(version); out.setVersion(version);
sigTerms[0].writeTo(out); out.writeNamedWriteable(sigTerms);
// read // read
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
@ -110,11 +111,11 @@ public class SignificanceHeuristicTests extends ESTestCase {
new SearchModule(Settings.EMPTY, registry, false, emptyList()); // populates the registry through side effects new SearchModule(Settings.EMPTY, registry, false, emptyList()); // populates the registry through side effects
in = new NamedWriteableAwareStreamInput(in, registry); in = new NamedWriteableAwareStreamInput(in, registry);
in.setVersion(version); in.setVersion(version);
sigTerms[1].readFrom(in); InternalMappedSignificantTerms<?, ?> read = (InternalMappedSignificantTerms<?, ?>) in.readNamedWriteable(InternalAggregation.class);
assertTrue(sigTerms[1].significanceHeuristic.equals(sigTerms[0].significanceHeuristic)); assertEquals(sigTerms.significanceHeuristic, read.significanceHeuristic);
InternalSignificantTerms.Bucket originalBucket = (InternalSignificantTerms.Bucket) sigTerms[0].buckets.get(0); SignificantTerms.Bucket originalBucket = sigTerms.getBuckets().get(0);
InternalSignificantTerms.Bucket streamedBucket = (InternalSignificantTerms.Bucket) sigTerms[1].buckets.get(0); SignificantTerms.Bucket streamedBucket = read.getBuckets().get(0);
assertThat(originalBucket.getKeyAsString(), equalTo(streamedBucket.getKeyAsString())); assertThat(originalBucket.getKeyAsString(), equalTo(streamedBucket.getKeyAsString()));
assertThat(originalBucket.getSupersetDf(), equalTo(streamedBucket.getSupersetDf())); assertThat(originalBucket.getSupersetDf(), equalTo(streamedBucket.getSupersetDf()));
assertThat(originalBucket.getSubsetDf(), equalTo(streamedBucket.getSubsetDf())); assertThat(originalBucket.getSubsetDf(), equalTo(streamedBucket.getSubsetDf()));
@ -122,22 +123,18 @@ public class SignificanceHeuristicTests extends ESTestCase {
assertThat(streamedBucket.getSupersetSize(), equalTo(20L)); assertThat(streamedBucket.getSupersetSize(), equalTo(20L));
} }
InternalSignificantTerms[] getRandomSignificantTerms(SignificanceHeuristic heuristic) { InternalMappedSignificantTerms<?, ?> getRandomSignificantTerms(SignificanceHeuristic heuristic) {
InternalSignificantTerms[] sTerms = new InternalSignificantTerms[2];
ArrayList<InternalSignificantTerms.Bucket> buckets = new ArrayList<>();
if (randomBoolean()) { if (randomBoolean()) {
buckets.add(new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY, null)); SignificantLongTerms.Bucket bucket = new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY,
sTerms[0] = new SignificantLongTerms(10, 20, "some_name", DocValueFormat.RAW, 1, 1, heuristic, buckets, DocValueFormat.RAW);
Collections.emptyList(), null); return new SignificantLongTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic,
sTerms[1] = new SignificantLongTerms(); singletonList(bucket));
} else { } else {
BytesRef term = new BytesRef("someterm"); SignificantStringTerms.Bucket bucket = new SignificantStringTerms.Bucket(new BytesRef("someterm"), 1, 2, 3, 4,
buckets.add(new SignificantStringTerms.Bucket(term, 1, 2, 3, 4, InternalAggregations.EMPTY, DocValueFormat.RAW)); InternalAggregations.EMPTY, DocValueFormat.RAW);
sTerms[0] = new SignificantStringTerms(10, 20, "some_name", DocValueFormat.RAW, 1, 1, heuristic, buckets, return new SignificantStringTerms("some_name", 1, 1, emptyList(), null, DocValueFormat.RAW, 10, 20, heuristic,
Collections.emptyList(), null); singletonList(bucket));
sTerms[1] = new SignificantStringTerms();
} }
return sTerms;
} }
SignificanceHeuristic getRandomSignificanceheuristic() { SignificanceHeuristic getRandomSignificanceheuristic() {
@ -165,37 +162,54 @@ public class SignificanceHeuristicTests extends ESTestCase {
// Create aggregations as they might come from three different shards and return as list. // Create aggregations as they might come from three different shards and return as list.
private List<InternalAggregation> createInternalAggregations() { private List<InternalAggregation> createInternalAggregations() {
String type = randomBoolean() ? "long" : "string";
SignificanceHeuristic significanceHeuristic = getRandomSignificanceheuristic(); SignificanceHeuristic significanceHeuristic = getRandomSignificanceheuristic();
TestAggFactory<?, ?> factory = randomBoolean() ? new StringTestAggFactory() : new LongTestAggFactory();
List<InternalAggregation> aggs = new ArrayList<>(); List<InternalAggregation> aggs = new ArrayList<>();
List<InternalSignificantTerms.Bucket> terms0Buckets = new ArrayList<>(); aggs.add(factory.createAggregation(significanceHeuristic, 4, 10, 1, (f, i) -> f.createBucket(4, 4, 5, 10, 0)));
terms0Buckets.add(createBucket(type, 4, 4, 5, 10, 0)); aggs.add(factory.createAggregation(significanceHeuristic, 4, 10, 1, (f, i) -> f.createBucket(4, 4, 5, 10, 1)));
aggs.add(createAggregation(type, significanceHeuristic, terms0Buckets, 4, 10)); aggs.add(factory.createAggregation(significanceHeuristic, 8, 10, 2, (f, i) -> f.createBucket(4, 4, 5, 10, i)));
List<InternalSignificantTerms.Bucket> terms1Buckets = new ArrayList<>();
terms0Buckets.add(createBucket(type, 4, 4, 5, 10, 1));
aggs.add(createAggregation(type, significanceHeuristic, terms1Buckets, 4, 10));
List<InternalSignificantTerms.Bucket> terms01Buckets = new ArrayList<>();
terms0Buckets.add(createBucket(type, 4, 8, 5, 10, 0));
terms0Buckets.add(createBucket(type, 4, 8, 5, 10, 1));
aggs.add(createAggregation(type, significanceHeuristic, terms01Buckets, 8, 10));
return aggs; return aggs;
} }
private InternalSignificantTerms createAggregation(String type, SignificanceHeuristic significanceHeuristic, List<InternalSignificantTerms.Bucket> buckets, long subsetSize, long supersetSize) { private abstract class TestAggFactory<A extends InternalSignificantTerms<A, B>, B extends InternalSignificantTerms.Bucket<B>> {
if (type.equals("string")) { final A createAggregation(SignificanceHeuristic significanceHeuristic, long subsetSize, long supersetSize, int bucketCount,
return new SignificantStringTerms(subsetSize, supersetSize, "sig_terms", DocValueFormat.RAW, 2, -1, significanceHeuristic, buckets, new ArrayList<PipelineAggregator>(), new HashMap<String, Object>()); BiFunction<TestAggFactory<?, B>, Integer, B> bucketFactory) {
} else { List<B> buckets = IntStream.range(0, bucketCount).mapToObj(i -> bucketFactory.apply(this, i))
return new SignificantLongTerms(subsetSize, supersetSize, "sig_terms", DocValueFormat.RAW, 2, -1, significanceHeuristic, buckets, new ArrayList<PipelineAggregator>(), new HashMap<String, Object>()); .collect(Collectors.toList());
} return createAggregation(significanceHeuristic, subsetSize, supersetSize, buckets);
} }
private InternalSignificantTerms.Bucket createBucket(String type, long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) { abstract A createAggregation(SignificanceHeuristic significanceHeuristic, long subsetSize, long supersetSize, List<B> buckets);
if (type.equals("string")) {
return new SignificantStringTerms.Bucket(new BytesRef(Long.toString(label).getBytes(StandardCharsets.UTF_8)), subsetDF, subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW); abstract B createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label);
} else { }
return new SignificantLongTerms.Bucket(subsetDF, subsetSize, supersetDF, supersetSize, label, InternalAggregations.EMPTY, DocValueFormat.RAW); private class StringTestAggFactory extends TestAggFactory<SignificantStringTerms, SignificantStringTerms.Bucket> {
@Override
SignificantStringTerms createAggregation(SignificanceHeuristic significanceHeuristic, long subsetSize, long supersetSize,
List<SignificantStringTerms.Bucket> buckets) {
return new SignificantStringTerms("sig_terms", 2, -1, emptyList(),
emptyMap(), DocValueFormat.RAW, subsetSize, supersetSize, significanceHeuristic, buckets);
}
@Override
SignificantStringTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) {
return new SignificantStringTerms.Bucket(new BytesRef(Long.toString(label).getBytes(StandardCharsets.UTF_8)), subsetDF,
subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY, DocValueFormat.RAW);
}
}
private class LongTestAggFactory extends TestAggFactory<SignificantLongTerms, SignificantLongTerms.Bucket> {
@Override
SignificantLongTerms createAggregation(SignificanceHeuristic significanceHeuristic, long subsetSize, long supersetSize,
List<SignificantLongTerms.Bucket> buckets) {
return new SignificantLongTerms("sig_terms", 2, -1, new ArrayList<PipelineAggregator>(), emptyMap(), DocValueFormat.RAW,
subsetSize, supersetSize, significanceHeuristic, buckets);
}
@Override
SignificantLongTerms.Bucket createBucket(long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) {
return new SignificantLongTerms.Bucket(subsetDF, subsetSize, supersetDF, supersetSize, label, InternalAggregations.EMPTY,
DocValueFormat.RAW);
} }
} }
@ -214,14 +228,22 @@ public class SignificanceHeuristicTests extends ESTestCase {
// test mutual information with string // test mutual information with string
boolean includeNegatives = randomBoolean(); boolean includeNegatives = randomBoolean();
boolean backgroundIsSuperset = randomBoolean(); boolean backgroundIsSuperset = randomBoolean();
assertThat(parseFromString(heuristicParserMapper, searchContext, "\"mutual_information\":{\"include_negatives\": " + includeNegatives + ", \"background_is_superset\":" + backgroundIsSuperset + "}"), equalTo((SignificanceHeuristic) (new MutualInformation(includeNegatives, backgroundIsSuperset)))); String mutual = "\"mutual_information\":{\"include_negatives\": " + includeNegatives + ", \"background_is_superset\":"
assertThat(parseFromString(heuristicParserMapper, searchContext, "\"chi_square\":{\"include_negatives\": " + includeNegatives + ", \"background_is_superset\":" + backgroundIsSuperset + "}"), equalTo((SignificanceHeuristic) (new ChiSquare(includeNegatives, backgroundIsSuperset)))); + backgroundIsSuperset + "}";
assertEquals(new MutualInformation(includeNegatives, backgroundIsSuperset),
parseFromString(heuristicParserMapper, searchContext, mutual));
String chiSquare = "\"chi_square\":{\"include_negatives\": " + includeNegatives + ", \"background_is_superset\":"
+ backgroundIsSuperset + "}";
assertEquals(new ChiSquare(includeNegatives, backgroundIsSuperset),
parseFromString(heuristicParserMapper, searchContext, chiSquare));
// test with builders // test with builders
assertTrue(parseFromBuilder(heuristicParserMapper, searchContext, new JLHScore()) instanceof JLHScore); assertThat(parseFromBuilder(heuristicParserMapper, searchContext, new JLHScore()), instanceOf(JLHScore.class));
assertTrue(parseFromBuilder(heuristicParserMapper, searchContext, new GND(backgroundIsSuperset)) instanceof GND); assertThat(parseFromBuilder(heuristicParserMapper, searchContext, new GND(backgroundIsSuperset)), instanceOf(GND.class));
assertThat(parseFromBuilder(heuristicParserMapper, searchContext, new MutualInformation(includeNegatives, backgroundIsSuperset)), equalTo((SignificanceHeuristic) new MutualInformation(includeNegatives, backgroundIsSuperset))); assertEquals(new MutualInformation(includeNegatives, backgroundIsSuperset),
assertThat(parseFromBuilder(heuristicParserMapper, searchContext, new ChiSquare(includeNegatives, backgroundIsSuperset)), equalTo((SignificanceHeuristic) new ChiSquare(includeNegatives, backgroundIsSuperset))); parseFromBuilder(heuristicParserMapper, searchContext, new MutualInformation(includeNegatives, backgroundIsSuperset)));
assertEquals(new ChiSquare(includeNegatives, backgroundIsSuperset),
parseFromBuilder(heuristicParserMapper, searchContext, new ChiSquare(includeNegatives, backgroundIsSuperset)));
// test exceptions // test exceptions
String faultyHeuristicdefinition = "\"mutual_information\":{\"include_negatives\": false, \"some_unknown_field\": false}"; String faultyHeuristicdefinition = "\"mutual_information\":{\"include_negatives\": false, \"some_unknown_field\": false}";
@ -246,7 +268,8 @@ public class SignificanceHeuristicTests extends ESTestCase {
IndicesQueriesRegistry registry = new IndicesQueriesRegistry(); IndicesQueriesRegistry registry = new IndicesQueriesRegistry();
try { try {
XContentParser stParser = JsonXContent.jsonXContent.createParser("{\"field\":\"text\", " + faultyHeuristicDefinition + ",\"min_doc_count\":200}"); XContentParser stParser = JsonXContent.jsonXContent.createParser(
"{\"field\":\"text\", " + faultyHeuristicDefinition + ",\"min_doc_count\":200}");
QueryParseContext parseContext = new QueryParseContext(registry, stParser, ParseFieldMatcher.STRICT); QueryParseContext parseContext = new QueryParseContext(registry, stParser, ParseFieldMatcher.STRICT);
stParser.nextToken(); stParser.nextToken();
new SignificantTermsParser(significanceHeuristicParserRegistry, registry).parse("testagg", parseContext); new SignificantTermsParser(significanceHeuristicParserRegistry, registry).parse("testagg", parseContext);
@ -283,7 +306,8 @@ public class SignificanceHeuristicTests extends ESTestCase {
protected SignificanceHeuristic parseFromString(ParseFieldRegistry<SignificanceHeuristicParser> significanceHeuristicParserRegistry, protected SignificanceHeuristic parseFromString(ParseFieldRegistry<SignificanceHeuristicParser> significanceHeuristicParserRegistry,
SearchContext searchContext, String heuristicString) throws IOException { SearchContext searchContext, String heuristicString) throws IOException {
XContentParser stParser = JsonXContent.jsonXContent.createParser("{\"field\":\"text\", " + heuristicString + ", \"min_doc_count\":200}"); XContentParser stParser = JsonXContent.jsonXContent.createParser(
"{\"field\":\"text\", " + heuristicString + ", \"min_doc_count\":200}");
return parseSignificanceHeuristic(significanceHeuristicParserRegistry, searchContext, stParser); return parseSignificanceHeuristic(significanceHeuristicParserRegistry, searchContext, stParser);
} }