cleanup formatting in significant aggs package
This commit is contained in:
parent
9500dddad3
commit
35696aeb75
|
@ -23,13 +23,12 @@ import org.apache.lucene.util.PriorityQueue;
|
|||
|
||||
public class BucketSignificancePriorityQueue extends PriorityQueue<SignificantTerms.Bucket> {
|
||||
|
||||
|
||||
public BucketSignificancePriorityQueue(int size) {
|
||||
super(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean lessThan(SignificantTerms.Bucket o1, SignificantTerms.Bucket o2) {
|
||||
return o1.getSignificanceScore() < o2.getSignificanceScore();
|
||||
return o1.getSignificanceScore() < o2.getSignificanceScore();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
protected Map<String, Bucket> bucketMap;
|
||||
protected long subsetSize;
|
||||
protected long supersetSize;
|
||||
|
||||
protected InternalSignificantTerms() {} // for serialization
|
||||
|
||||
// TODO updateScore call in constructor to be cleaned up as part of adding pluggable scoring algos
|
||||
|
@ -58,39 +59,39 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
}
|
||||
|
||||
@Override
|
||||
public long getSubsetDf(){
|
||||
public long getSubsetDf() {
|
||||
return subsetDf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSupersetDf(){
|
||||
public long getSupersetDf() {
|
||||
return supersetDf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSupersetSize(){
|
||||
public long getSupersetSize() {
|
||||
return supersetSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSubsetSize(){
|
||||
public long getSubsetSize() {
|
||||
return subsetSize;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Calculates the significance of a term in a sample against a background of
|
||||
* normal distributions by comparing the changes in frequency. This is the heart
|
||||
* of the significant terms feature.
|
||||
*
|
||||
* of the significant terms feature.
|
||||
* <p/>
|
||||
* TODO - allow pluggable scoring implementations
|
||||
*
|
||||
* @param subsetFreq The frequency of the term in the selected sample
|
||||
* @param subsetSize The size of the selected sample (typically number of docs)
|
||||
*
|
||||
* @param subsetFreq The frequency of the term in the selected sample
|
||||
* @param subsetSize The size of the selected sample (typically number of docs)
|
||||
* @param supersetFreq The frequency of the term in the superset from which the sample was taken
|
||||
* @param supersetSize The size of the superset from which the sample was taken (typically number of docs)
|
||||
* @return a "significance" score
|
||||
*/
|
||||
public static final double getSampledTermSignificance(long subsetFreq, long subsetSize, long supersetFreq, long supersetSize) {
|
||||
public static double getSampledTermSignificance(long subsetFreq, long subsetSize, long supersetFreq, long supersetSize) {
|
||||
if ((subsetSize == 0) || (supersetSize == 0)) {
|
||||
// avoid any divide by zero issues
|
||||
return 0;
|
||||
|
@ -125,7 +126,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
|
||||
public void updateScore() {
|
||||
score = getSampledTermSignificance(subsetDf, subsetSize, supersetDf, supersetSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDocCount() {
|
||||
|
@ -161,7 +162,7 @@ public abstract class InternalSignificantTerms extends InternalAggregation imple
|
|||
@Override
|
||||
public double getSignificanceScore() {
|
||||
return score;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected InternalSignificantTerms(long subsetSize, long supersetSize, String name, int requiredSize, long minDocCount, Collection<Bucket> buckets) {
|
||||
|
|
|
@ -54,7 +54,6 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
|
||||
static class Bucket extends InternalSignificantTerms.Bucket {
|
||||
|
||||
long term;
|
||||
|
@ -83,15 +82,17 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||
public String getKey() {
|
||||
return Long.toString(term);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private ValueFormatter valueFormatter;
|
||||
|
||||
SignificantLongTerms() {} // for serialization
|
||||
|
||||
public SignificantLongTerms(long subsetSize, long supersetSize, String name, ValueFormatter valueFormatter, int requiredSize, long minDocCount, Collection<InternalSignificantTerms.Bucket> buckets) {
|
||||
super(subsetSize, supersetSize,name, requiredSize, minDocCount, buckets);
|
||||
public SignificantLongTerms(long subsetSize, long supersetSize, String name, ValueFormatter valueFormatter,
|
||||
int requiredSize, long minDocCount, Collection<InternalSignificantTerms.Bucket> buckets) {
|
||||
|
||||
super(subsetSize, supersetSize, name, requiredSize, minDocCount, buckets);
|
||||
this.valueFormatter = valueFormatter;
|
||||
}
|
||||
|
||||
|
@ -100,7 +101,6 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||
return TYPE;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
this.name = in.readString();
|
||||
|
@ -109,15 +109,14 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||
this.minDocCount = in.readVLong();
|
||||
this.subsetSize = in.readVLong();
|
||||
this.supersetSize = in.readVLong();
|
||||
|
||||
|
||||
int size = in.readVInt();
|
||||
List<InternalSignificantTerms.Bucket> buckets = new ArrayList<InternalSignificantTerms.Bucket>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
long subsetDf=in.readVLong();
|
||||
long supersetDf=in.readVLong();
|
||||
long term=in.readLong();
|
||||
buckets.add(new Bucket(subsetDf, subsetSize, supersetDf,
|
||||
supersetSize, term, InternalAggregations.readAggregations(in)));
|
||||
long subsetDf = in.readVLong();
|
||||
long supersetDf = in.readVLong();
|
||||
long term = in.readLong();
|
||||
buckets.add(new Bucket(subsetDf, subsetSize, supersetDf,supersetSize, term, InternalAggregations.readAggregations(in)));
|
||||
}
|
||||
this.buckets = buckets;
|
||||
this.bucketMap = null;
|
||||
|
@ -130,11 +129,11 @@ public class SignificantLongTerms extends InternalSignificantTerms {
|
|||
writeSize(requiredSize, out);
|
||||
out.writeVLong(minDocCount);
|
||||
out.writeVLong(subsetSize);
|
||||
out.writeVLong(supersetSize);
|
||||
out.writeVLong(supersetSize);
|
||||
out.writeVInt(buckets.size());
|
||||
for (InternalSignificantTerms.Bucket bucket : buckets) {
|
||||
out.writeVLong(((Bucket) bucket).subsetDf);
|
||||
out.writeVLong(((Bucket) bucket).supersetDf);
|
||||
out.writeVLong(((Bucket) bucket).supersetDf);
|
||||
out.writeLong(((Bucket) bucket).term);
|
||||
((InternalAggregations) bucket.getAggregations()).writeTo(out);
|
||||
}
|
||||
|
|
|
@ -37,18 +37,19 @@ import java.util.Collections;
|
|||
public class SignificantLongTermsAggregator extends LongTermsAggregator {
|
||||
|
||||
public SignificantLongTermsAggregator(String name, AggregatorFactories factories, NumericValuesSource valuesSource,
|
||||
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount,
|
||||
AggregationContext aggregationContext, Aggregator parent,SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount,
|
||||
AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
|
||||
super(name, factories, valuesSource, estimatedBucketCount, null, requiredSize, shardSize, minDocCount, aggregationContext, parent);
|
||||
this.termsAggFactory = termsAggFactory;
|
||||
}
|
||||
|
||||
protected long numCollectedDocs;
|
||||
private SignificantTermsAggregatorFactory termsAggFactory;
|
||||
private final SignificantTermsAggregatorFactory termsAggFactory;
|
||||
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrdinal) throws IOException {
|
||||
super.collect(doc,owningBucketOrdinal);
|
||||
super.collect(doc, owningBucketOrdinal);
|
||||
numCollectedDocs++;
|
||||
}
|
||||
|
||||
|
@ -65,7 +66,7 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
|
|||
|
||||
BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue(size);
|
||||
SignificantLongTerms.Bucket spare = null;
|
||||
for (long i = 0; i < bucketOrds.capacity(); ++i) {
|
||||
for (long i = 0; i < bucketOrds.capacity(); i++) {
|
||||
final long ord = bucketOrds.id(i);
|
||||
if (ord < 0) {
|
||||
// slot is not allocated
|
||||
|
@ -81,10 +82,8 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
|
|||
spare.supersetDf = termsAggFactory.getBackgroundFrequency(topReader, spare.term);
|
||||
spare.supersetSize = supersetSize;
|
||||
assert spare.subsetDf <= spare.supersetDf;
|
||||
// During shard-local down-selection we use subset/superset stats
|
||||
// that are for this shard only
|
||||
// Back at the central reducer these properties will be updated with
|
||||
// global stats
|
||||
// During shard-local down-selection we use subset/superset stats that are for this shard only
|
||||
// Back at the central reducer these properties will be updated with global stats
|
||||
spare.updateScore();
|
||||
|
||||
spare.bucketOrd = ord;
|
||||
|
@ -92,23 +91,22 @@ public class SignificantLongTermsAggregator extends LongTermsAggregator {
|
|||
}
|
||||
|
||||
final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; --i) {
|
||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
final SignificantLongTerms.Bucket bucket = (SignificantLongTerms.Bucket) ordered.pop();
|
||||
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
|
||||
list[i] = bucket;
|
||||
}
|
||||
return new SignificantLongTerms(subsetSize, supersetSize, name, valuesSource.formatter(), requiredSize, minDocCount,
|
||||
Arrays.asList(list));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SignificantLongTerms buildEmptyAggregation() {
|
||||
// We need to account for the significance of a miss in our global stats
|
||||
// - provide corpus size as context
|
||||
// We need to account for the significance of a miss in our global stats - provide corpus size as context
|
||||
ContextIndexSearcher searcher = context.searchContext().searcher();
|
||||
IndexReader topReader = searcher.getIndexReader();
|
||||
int supersetSize = topReader.numDocs();
|
||||
return new SignificantLongTerms(0, supersetSize, name, valuesSource.formatter(), requiredSize, minDocCount, Collections.<InternalSignificantTerms.Bucket>emptyList());
|
||||
return new SignificantLongTerms(0, supersetSize, name, valuesSource.formatter(), requiredSize, minDocCount, Collections.<InternalSignificantTerms.Bucket>emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,7 +38,7 @@ import java.util.List;
|
|||
*
|
||||
*/
|
||||
public class SignificantStringTerms extends InternalSignificantTerms {
|
||||
|
||||
|
||||
public static final InternalAggregation.Type TYPE = new Type("significant_terms", "sigsterms");
|
||||
|
||||
public static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
|
||||
|
@ -54,17 +54,15 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||
AggregationStreams.registerStream(STREAM, TYPE.stream());
|
||||
}
|
||||
|
||||
|
||||
public static class Bucket extends InternalSignificantTerms.Bucket {
|
||||
|
||||
BytesRef termBytes;
|
||||
|
||||
|
||||
public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize,InternalAggregations aggregations) {
|
||||
super(subsetDf, subsetSize,supersetDf,supersetSize,aggregations);
|
||||
|
||||
public Bucket(BytesRef term, long subsetDf, long subsetSize, long supersetDf, long supersetSize, InternalAggregations aggregations) {
|
||||
super(subsetDf, subsetSize, supersetDf, supersetSize, aggregations);
|
||||
this.termBytes = term;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Text getKeyAsText() {
|
||||
|
@ -82,7 +80,6 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||
return BytesRef.getUTF8SortedAsUnicodeComparator().compare(termBytes, ((Bucket) other).termBytes);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getKey() {
|
||||
return termBytes.utf8ToString();
|
||||
|
@ -92,8 +89,9 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||
|
||||
SignificantStringTerms() {} // for serialization
|
||||
|
||||
public SignificantStringTerms(long subsetSize, long supersetSize,String name, int requiredSize, long minDocCount, Collection<InternalSignificantTerms.Bucket> buckets) {
|
||||
super(subsetSize, supersetSize, name, requiredSize, minDocCount, buckets);
|
||||
public SignificantStringTerms(long subsetSize, long supersetSize, String name, int requiredSize,
|
||||
long minDocCount, Collection<InternalSignificantTerms.Bucket> buckets) {
|
||||
super(subsetSize, supersetSize, name, requiredSize, minDocCount, buckets);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -112,10 +110,9 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||
List<InternalSignificantTerms.Bucket> buckets = new ArrayList<InternalSignificantTerms.Bucket>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
BytesRef term = in.readBytesRef();
|
||||
long subsetDf= in.readVLong();
|
||||
long supersetDf= in.readVLong();
|
||||
buckets.add(new Bucket(term,subsetDf, subsetSize, supersetDf,
|
||||
supersetSize, InternalAggregations.readAggregations(in)));
|
||||
long subsetDf = in.readVLong();
|
||||
long supersetDf = in.readVLong();
|
||||
buckets.add(new Bucket(term, subsetDf, subsetSize, supersetDf, supersetSize, InternalAggregations.readAggregations(in)));
|
||||
}
|
||||
this.buckets = buckets;
|
||||
this.bucketMap = null;
|
||||
|
@ -145,7 +142,7 @@ public class SignificantStringTerms extends InternalSignificantTerms {
|
|||
for (InternalSignificantTerms.Bucket bucket : buckets) {
|
||||
//There is a condition (presumably when only one shard has a bucket?) where reduce is not called
|
||||
// and I end up with buckets that contravene the user's min_doc_count criteria in my reducer
|
||||
if(bucket.subsetDf>=minDocCount){
|
||||
if (bucket.subsetDf >= minDocCount) {
|
||||
builder.startObject();
|
||||
builder.field(CommonFields.KEY, ((Bucket) bucket).termBytes);
|
||||
builder.field(CommonFields.DOC_COUNT, bucket.getDocCount());
|
||||
|
|
|
@ -45,22 +45,24 @@ import java.util.Collections;
|
|||
public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
||||
|
||||
protected long numCollectedDocs;
|
||||
protected SignificantTermsAggregatorFactory termsAggFactory;
|
||||
|
||||
protected final SignificantTermsAggregatorFactory termsAggFactory;
|
||||
|
||||
public SignificantStringTermsAggregator(String name, AggregatorFactories factories, ValuesSource valuesSource,
|
||||
long estimatedBucketCount, int requiredSize, int shardSize, long minDocCount,
|
||||
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
super(name, factories, valuesSource, estimatedBucketCount, null, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext,
|
||||
parent);
|
||||
this.termsAggFactory=termsAggFactory;
|
||||
IncludeExclude includeExclude, AggregationContext aggregationContext, Aggregator parent,
|
||||
SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
|
||||
super(name, factories, valuesSource, estimatedBucketCount, null, requiredSize, shardSize,
|
||||
minDocCount, includeExclude, aggregationContext, parent);
|
||||
this.termsAggFactory = termsAggFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int doc, long owningBucketOrdinal) throws IOException {
|
||||
super.collect(doc,owningBucketOrdinal);
|
||||
super.collect(doc, owningBucketOrdinal);
|
||||
numCollectedDocs++;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) {
|
||||
assert owningBucketOrdinal == 0;
|
||||
|
@ -96,7 +98,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
|||
}
|
||||
|
||||
final InternalSignificantTerms.Bucket[] list = new InternalSignificantTerms.Bucket[ordered.size()];
|
||||
for (int i = ordered.size() - 1; i >= 0; --i) {
|
||||
for (int i = ordered.size() - 1; i >= 0; i--) {
|
||||
final SignificantStringTerms.Bucket bucket = (SignificantStringTerms.Bucket) ordered.pop();
|
||||
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
|
||||
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
|
||||
|
@ -109,12 +111,11 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
|||
|
||||
@Override
|
||||
public SignificantStringTerms buildEmptyAggregation() {
|
||||
// We need to account for the significance of a miss in our global stats
|
||||
// - provide corpus size as context
|
||||
// We need to account for the significance of a miss in our global stats - provide corpus size as context
|
||||
ContextIndexSearcher searcher = context.searchContext().searcher();
|
||||
IndexReader topReader = searcher.getIndexReader();
|
||||
int supersetSize = topReader.numDocs();
|
||||
return new SignificantStringTerms(0, supersetSize, name, requiredSize, minDocCount, Collections.<InternalSignificantTerms.Bucket> emptyList());
|
||||
return new SignificantStringTerms(0, supersetSize, name, requiredSize, minDocCount, Collections.<InternalSignificantTerms.Bucket>emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,8 +133,9 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
|||
private Ordinals.Docs ordinals;
|
||||
private LongArray ordinalToBucket;
|
||||
|
||||
public WithOrdinals(String name, AggregatorFactories factories, BytesValuesSource.WithOrdinals valuesSource,
|
||||
long esitmatedBucketCount, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent,SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
public WithOrdinals(String name, AggregatorFactories factories, BytesValuesSource.WithOrdinals valuesSource,
|
||||
long esitmatedBucketCount, int requiredSize, int shardSize, long minDocCount, AggregationContext aggregationContext,
|
||||
Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
super(name, factories, valuesSource, esitmatedBucketCount, requiredSize, shardSize, minDocCount, null, aggregationContext, parent, termsAggFactory);
|
||||
this.valuesSource = valuesSource;
|
||||
}
|
||||
|
@ -161,8 +163,7 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
|||
for (int i = 0; i < valuesCount; ++i) {
|
||||
final long ord = ordinals.nextOrd();
|
||||
long bucketOrd = ordinalToBucket.get(ord);
|
||||
if (bucketOrd < 0) { // unlikely condition on a low-cardinality
|
||||
// field
|
||||
if (bucketOrd < 0) { // unlikely condition on a low-cardinality field
|
||||
final BytesRef bytes = bytesValues.getValueByOrd(ord);
|
||||
final int hash = bytesValues.currentValueHash();
|
||||
assert hash == bytes.hashCode();
|
||||
|
@ -176,11 +177,11 @@ public class SignificantStringTermsAggregator extends StringTermsAggregator {
|
|||
collectBucket(doc, bucketOrd);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void doRelease() {
|
||||
Releasables.release(bucketOrds, termsAggFactory, ordinalToBucket);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -28,13 +28,13 @@ import java.util.Collection;
|
|||
public interface SignificantTerms extends MultiBucketsAggregation, Iterable<SignificantTerms.Bucket> {
|
||||
|
||||
|
||||
static abstract class Bucket implements MultiBucketsAggregation.Bucket {
|
||||
static abstract class Bucket implements MultiBucketsAggregation.Bucket {
|
||||
|
||||
long subsetDf;
|
||||
long subsetSize;
|
||||
long supersetDf;
|
||||
long supersetSize;
|
||||
|
||||
|
||||
Bucket(long subsetDf, long subsetSize, long supersetDf, long supersetSize) {
|
||||
super();
|
||||
this.subsetDf = subsetDf;
|
||||
|
@ -46,24 +46,24 @@ public interface SignificantTerms extends MultiBucketsAggregation, Iterable<Sign
|
|||
public abstract Number getKeyAsNumber();
|
||||
|
||||
abstract int compareTerm(SignificantTerms.Bucket other);
|
||||
|
||||
|
||||
public abstract double getSignificanceScore();
|
||||
|
||||
public long getSubsetDf(){
|
||||
|
||||
public long getSubsetDf() {
|
||||
return subsetDf;
|
||||
}
|
||||
|
||||
public long getSupersetDf(){
|
||||
public long getSupersetDf() {
|
||||
return supersetDf;
|
||||
}
|
||||
|
||||
public long getSupersetSize(){
|
||||
public long getSupersetSize() {
|
||||
return supersetSize;
|
||||
}
|
||||
|
||||
public long getSubsetSize(){
|
||||
public long getSubsetSize() {
|
||||
return subsetSize;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,9 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
|
|||
private BytesRefHash cachedTermOrds;
|
||||
private BigArrays bigArrays;
|
||||
|
||||
public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, int requiredSize, int shardSize, long minDocCount, IncludeExclude includeExclude, String executionHint) {
|
||||
public SignificantTermsAggregatorFactory(String name, ValuesSourceConfig valueSourceConfig, int requiredSize,
|
||||
int shardSize, long minDocCount, IncludeExclude includeExclude, String executionHint) {
|
||||
|
||||
super(name, SignificantStringTerms.TYPE.name(), valueSourceConfig);
|
||||
this.requiredSize = requiredSize;
|
||||
this.shardSize = shardSize;
|
||||
|
@ -89,9 +91,8 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
|
|||
return false;
|
||||
} else if (parent.bucketAggregationMode() == BucketAggregationMode.PER_BUCKET) {
|
||||
return true;
|
||||
} else {
|
||||
return hasParentBucketAggregator(parent.parent());
|
||||
}
|
||||
return hasParentBucketAggregator(parent.parent());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,10 +133,9 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
|
|||
|
||||
if (execution.equals(EXECUTION_HINT_VALUE_ORDINALS)) {
|
||||
assert includeExclude == null;
|
||||
return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (BytesValuesSource.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, this );
|
||||
} else {
|
||||
return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this);
|
||||
return new SignificantStringTermsAggregator.WithOrdinals(name, factories, (BytesValuesSource.WithOrdinals) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, this);
|
||||
}
|
||||
return new SignificantStringTermsAggregator(name, factories, valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, includeExclude, aggregationContext, parent, this);
|
||||
}
|
||||
|
||||
if (includeExclude != null) {
|
||||
|
@ -144,19 +144,18 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
|
|||
}
|
||||
|
||||
if (valuesSource instanceof NumericValuesSource) {
|
||||
|
||||
|
||||
if (((NumericValuesSource) valuesSource).isFloatingPoint()) {
|
||||
throw new UnsupportedOperationException("No support for examining floating point numerics");
|
||||
throw new UnsupportedOperationException("No support for examining floating point numerics");
|
||||
}
|
||||
return new SignificantLongTermsAggregator(name, factories, (NumericValuesSource) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent,this);
|
||||
return new SignificantLongTermsAggregator(name, factories, (NumericValuesSource) valuesSource, estimatedBucketCount, requiredSize, shardSize, minDocCount, aggregationContext, parent, this);
|
||||
}
|
||||
|
||||
throw new AggregationExecutionException("sigfnificant_terms aggregation cannot be applied to field [" + valuesSourceConfig.fieldContext().field() +
|
||||
"]. It can only be applied to numeric or string fields.");
|
||||
}
|
||||
|
||||
// Many child aggs may ask for the same docFreq information so here we cache docFreq
|
||||
// values for these terms.
|
||||
// Many child aggs may ask for the same docFreq information so here we cache docFreq values for these terms.
|
||||
// TODO this should be re-factored into a more generic system for efficiently checking frequencies of things
|
||||
// In future we may need to a) check the frequency in a set other than the index e.g. a subset and b) check
|
||||
// the frequency of an entity other than an a single indexed term e.g. a numeric range.
|
||||
|
@ -179,10 +178,7 @@ public class SignificantTermsAggregatorFactory extends ValueSourceAggregatorFact
|
|||
return result;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Many child aggs may ask for the same docFreq information so cache docFreq
|
||||
// values for these terms
|
||||
// Many child aggs may ask for the same docFreq information so cache docFreq values for these terms
|
||||
public long getBackgroundFrequency(IndexReader topReader, long term) {
|
||||
BytesRef indexedVal = mapper.indexedValueForSearch(term);
|
||||
return getBackgroundFrequency(topReader, indexedVal);
|
||||
|
|
|
@ -26,13 +26,12 @@ import java.io.IOException;
|
|||
|
||||
/**
|
||||
* Creates an aggregation that finds interesting or unusual occurrences of terms in a result set.
|
||||
*
|
||||
* This feature is marked as experimental, and may be subject to change in the future. If you
|
||||
* <p/>
|
||||
* This feature is marked as experimental, and may be subject to change in the future. If you
|
||||
* use this feature, please let us know your experience with it!
|
||||
*/
|
||||
public class SignificantTermsBuilder extends AggregationBuilder<SignificantTermsBuilder> {
|
||||
|
||||
|
||||
private String field;
|
||||
private int requiredSize = SignificantTermsParser.DEFAULT_REQUIRED_SIZE;
|
||||
private int shardSize = SignificantTermsParser.DEFAULT_SHARD_SIZE;
|
||||
|
@ -51,17 +50,17 @@ public class SignificantTermsBuilder extends AggregationBuilder<SignificantTerms
|
|||
this.requiredSize = requiredSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SignificantTermsBuilder shardSize(int shardSize) {
|
||||
this.shardSize = shardSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public SignificantTermsBuilder minDocCount(int minDocCount) {
|
||||
this.minDocCount = minDocCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
|
|
@ -46,16 +46,17 @@ import java.util.regex.Pattern;
|
|||
*/
|
||||
public class SignificantTermsParser implements Aggregator.Parser {
|
||||
|
||||
public static final int DEFAULT_REQUIRED_SIZE = 10;
|
||||
public static final int DEFAULT_SHARD_SIZE = 0;
|
||||
|
||||
//Typically need more than one occurrence of something for it to be statistically significant
|
||||
public static final int DEFAULT_MIN_DOC_COUNT = 3;
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return SignificantStringTerms.TYPE.name();
|
||||
}
|
||||
|
||||
public static final int DEFAULT_REQUIRED_SIZE=10;
|
||||
public static final int DEFAULT_SHARD_SIZE=0;
|
||||
//Typically need more than one occurrence of something for it to be statistically significant
|
||||
public static final int DEFAULT_MIN_DOC_COUNT = 3;
|
||||
|
||||
@Override
|
||||
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
|
||||
|
@ -68,7 +69,7 @@ public class SignificantTermsParser implements Aggregator.Parser {
|
|||
String exclude = null;
|
||||
int excludeFlags = 0; // 0 means no flags
|
||||
String executionHint = null;
|
||||
long minDocCount = DEFAULT_MIN_DOC_COUNT;
|
||||
long minDocCount = DEFAULT_MIN_DOC_COUNT;
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
|
@ -100,7 +101,7 @@ public class SignificantTermsParser implements Aggregator.Parser {
|
|||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("include".equals(currentFieldName)) {
|
||||
if ("include".equals(currentFieldName)) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
|
@ -149,7 +150,7 @@ public class SignificantTermsParser implements Aggregator.Parser {
|
|||
//some of the things we want to find have only one occurrence on each shard and as
|
||||
// such are impossible to differentiate from non-significant terms at that early stage.
|
||||
shardSize = 2 * BucketUtils.suggestShardSideQueueSize(requiredSize, context.numberOfShards());
|
||||
|
||||
|
||||
}
|
||||
|
||||
// shard_size cannot be smaller than size as we need to at least fetch <size> entries from every shards in order to return <size>
|
||||
|
@ -159,12 +160,11 @@ public class SignificantTermsParser implements Aggregator.Parser {
|
|||
|
||||
IncludeExclude includeExclude = null;
|
||||
if (include != null || exclude != null) {
|
||||
Pattern includePattern = include != null ? Pattern.compile(include, includeFlags) : null;
|
||||
Pattern includePattern = include != null ? Pattern.compile(include, includeFlags) : null;
|
||||
Pattern excludePattern = exclude != null ? Pattern.compile(exclude, excludeFlags) : null;
|
||||
includeExclude = new IncludeExclude(includePattern, excludePattern);
|
||||
}
|
||||
|
||||
|
||||
FieldMapper<?> mapper = context.smartNameFieldMapper(field);
|
||||
if (mapper == null) {
|
||||
ValuesSourceConfig<?> config = new ValuesSourceConfig<BytesValuesSource>(BytesValuesSource.class);
|
||||
|
@ -205,9 +205,8 @@ public class SignificantTermsParser implements Aggregator.Parser {
|
|||
config.fieldContext(new FieldContext(field, indexFieldData));
|
||||
// We need values to be unique to be able to run terms aggs efficiently
|
||||
config.ensureUnique(true);
|
||||
|
||||
|
||||
return new SignificantTermsAggregatorFactory(aggregationName, config, requiredSize, shardSize, minDocCount, includeExclude, executionHint);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -53,10 +53,10 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms {
|
|||
|
||||
UnmappedSignificantTerms() {} // for serialization
|
||||
|
||||
public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount) {
|
||||
public UnmappedSignificantTerms(String name, int requiredSize, long minDocCount) {
|
||||
//We pass zero for index/subset sizes because for the purpose of significant term analysis
|
||||
// we assume an unmapped index's size is irrelevant to the proceedings.
|
||||
super(0,0,name, requiredSize, minDocCount, BUCKETS);
|
||||
super(0, 0, name, requiredSize, minDocCount, BUCKETS);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,9 +34,11 @@ public class UnmappedSignificantTermsAggregator extends Aggregator {
|
|||
|
||||
private final int requiredSize;
|
||||
private final long minDocCount;
|
||||
private SignificantTermsAggregatorFactory termsAggFactory;
|
||||
private final SignificantTermsAggregatorFactory termsAggFactory;
|
||||
|
||||
public UnmappedSignificantTermsAggregator(String name, int requiredSize, long minDocCount, AggregationContext aggregationContext,
|
||||
Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
|
||||
public UnmappedSignificantTermsAggregator(String name, int requiredSize, long minDocCount, AggregationContext aggregationContext, Aggregator parent, SignificantTermsAggregatorFactory termsAggFactory) {
|
||||
super(name, BucketAggregationMode.PER_BUCKET, AggregatorFactories.EMPTY, 0, aggregationContext, parent);
|
||||
this.requiredSize = requiredSize;
|
||||
this.minDocCount = minDocCount;
|
||||
|
@ -71,5 +73,5 @@ public class UnmappedSignificantTermsAggregator extends Aggregator {
|
|||
protected void doRelease() {
|
||||
Releasables.release(termsAggFactory);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue