SignificantText aggregation - like significant_terms, but for text (#24432)

* SignificantText aggregation - like significant_terms but doesn’t require fielddata=true, recommended used with `sampler` agg to limit expense of tokenizing docs and takes optional `filter_duplicate_text`:true setting to avoid stats skew from repeated sections of text in search results.

Closes #23674
This commit is contained in:
markharwood 2017-05-24 13:46:43 +01:00 committed by GitHub
parent b5adb3cce9
commit b7197f5e21
20 changed files with 2781 additions and 162 deletions

View File

@ -0,0 +1,201 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.analysis.miscellaneous;
import org.apache.lucene.analysis.FilteringTokenFilter;
import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.hash.MurmurHash3;
import java.io.IOException;
import java.util.ArrayList;
/**
* Inspects token streams for duplicate sequences of tokens. Token sequences
* have a minimum length - 6 is a good heuristic as it avoids filtering common
* idioms/phrases but detects longer sections that are typical of cut+paste
* copies of text.
*
* <p>
* Internally each token is hashed/moduloed into a single byte (so 256 possible
* values for each token) and then recorded in a trie of seen byte sequences
* using a {@link DuplicateByteSequenceSpotter}. This trie is passed into the
* TokenFilter constructor so a single object can be reused across multiple
* documents.
*
* <p>
* The emitDuplicates setting controls if duplicate tokens are filtered from
* results or are output (the {@link DuplicateSequenceAttribute} attribute can
* be used to inspect the number of prior sightings when emitDuplicates is true)
*/
public class DeDuplicatingTokenFilter extends FilteringTokenFilter {
private final DuplicateSequenceAttribute seqAtt = addAttribute(DuplicateSequenceAttribute.class);
private final boolean emitDuplicates;
static final MurmurHash3.Hash128 seed = new MurmurHash3.Hash128();
public DeDuplicatingTokenFilter(TokenStream in, DuplicateByteSequenceSpotter byteStreamDuplicateSpotter) {
this(in, byteStreamDuplicateSpotter, false);
}
/**
*
* @param in
* The input token stream
* @param byteStreamDuplicateSpotter
* object which retains trie of token sequences
* @param emitDuplicates
* true if duplicate tokens are to be emitted (use
* {@link DuplicateSequenceAttribute} attribute to inspect number
* of prior sightings of tokens as part of a sequence).
*/
public DeDuplicatingTokenFilter(TokenStream in, DuplicateByteSequenceSpotter byteStreamDuplicateSpotter, boolean emitDuplicates) {
super(new DuplicateTaggingFilter(byteStreamDuplicateSpotter, in));
this.emitDuplicates = emitDuplicates;
}
@Override
protected boolean accept() throws IOException {
return emitDuplicates || seqAtt.getNumPriorUsesInASequence() < 1;
}
private static class DuplicateTaggingFilter extends TokenFilter {
private final DuplicateSequenceAttribute seqAtt = addAttribute(DuplicateSequenceAttribute.class);
TermToBytesRefAttribute termBytesAtt = addAttribute(TermToBytesRefAttribute.class);
private DuplicateByteSequenceSpotter byteStreamDuplicateSpotter;
private ArrayList<State> allTokens;
int pos = 0;
private final int windowSize;
protected DuplicateTaggingFilter(DuplicateByteSequenceSpotter byteStreamDuplicateSpotter, TokenStream input) {
super(input);
this.byteStreamDuplicateSpotter = byteStreamDuplicateSpotter;
this.windowSize = DuplicateByteSequenceSpotter.TREE_DEPTH;
}
@Override
public final boolean incrementToken() throws IOException {
if (allTokens == null) {
loadAllTokens();
}
clearAttributes();
if (pos < allTokens.size()) {
State earlierToken = allTokens.get(pos);
pos++;
restoreState(earlierToken);
return true;
} else {
return false;
}
}
public void loadAllTokens() throws IOException {
// TODO consider changing this implementation to emit tokens as-we-go
// rather than buffering all. However this array is perhaps not the
// bulk of memory usage (in practice the dupSequenceSpotter requires
// ~5x the original content size in its internal tree ).
allTokens = new ArrayList<State>(256);
/*
* Given the bytes 123456123456 and a duplicate sequence size of 6
* the byteStreamDuplicateSpotter will only flag the final byte as
* part of a duplicate sequence due to the byte-at-a-time streaming
* nature of its assessments. When this happens we retain a buffer
* of the last 6 tokens so that we can mark the states of prior
* tokens (bytes 7 to 11) as also being duplicates
*/
pos = 0;
boolean isWrapped = false;
State priorStatesBuffer[] = new State[windowSize];
short priorMaxNumSightings[] = new short[windowSize];
int cursor = 0;
while (input.incrementToken()) {
BytesRef bytesRef = termBytesAtt.getBytesRef();
long tokenHash = MurmurHash3.hash128(bytesRef.bytes, bytesRef.offset, bytesRef.length, 0, seed).h1;
byte tokenByte = (byte) (tokenHash & 0xFF);
short numSightings = byteStreamDuplicateSpotter.addByte(tokenByte);
priorStatesBuffer[cursor] = captureState();
// Revise prior captured State objects if the latest
// token is marked as a duplicate
if (numSightings >= 1) {
int numLengthsToRecord = windowSize;
int pos = cursor;
while (numLengthsToRecord > 0) {
if (pos < 0) {
pos = windowSize - 1;
}
priorMaxNumSightings[pos] = (short) Math.max(priorMaxNumSightings[pos], numSightings);
numLengthsToRecord--;
pos--;
}
}
// Reposition cursor to next free slot
cursor++;
if (cursor >= windowSize) {
// wrap around the buffer
cursor = 0;
isWrapped = true;
}
// clean out the end of the tail that we may overwrite if the
// next iteration adds a new head
if (isWrapped) {
// tokenPos is now positioned on tail - emit any valid
// tokens we may about to overwrite in the next iteration
if (priorStatesBuffer[cursor] != null) {
recordLengthInfoState(priorMaxNumSightings, priorStatesBuffer, cursor);
}
}
} // end loop reading all tokens from stream
// Flush the buffered tokens
int pos = isWrapped ? nextAfter(cursor) : 0;
while (pos != cursor) {
recordLengthInfoState(priorMaxNumSightings, priorStatesBuffer, pos);
pos = nextAfter(pos);
}
}
private int nextAfter(int pos) {
pos++;
if (pos >= windowSize) {
pos = 0;
}
return pos;
}
private void recordLengthInfoState(short[] maxNumSightings, State[] tokenStates, int cursor) {
if (maxNumSightings[cursor] > 0) {
// We need to patch in the max sequence length we recorded at
// this position into the token state
restoreState(tokenStates[cursor]);
seqAtt.setNumPriorUsesInASequence(maxNumSightings[cursor]);
maxNumSightings[cursor] = 0;
// record the patched state
tokenStates[cursor] = captureState();
}
allTokens.add(tokenStates[cursor]);
}
}
}

View File

@ -0,0 +1,311 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.analysis.miscellaneous;
import org.apache.lucene.util.RamUsageEstimator;
/**
* A Trie structure for analysing byte streams for duplicate sequences. Bytes
* from a stream are added one at a time using the addByte method and the number
* of times it has been seen as part of a sequence is returned.
*
* The minimum required length for a duplicate sequence detected is 6 bytes.
*
* The design goals are to maximize speed of lookup while minimizing the space
* required to do so. This has led to a hybrid solution for representing the
* bytes that make up a sequence in the trie.
*
* If we have 6 bytes in sequence e.g. abcdef then they are represented as
* object nodes in the tree as follows:
* <p>
* (a)-(b)-(c)-(def as an int)
* <p>
*
*
* {@link RootTreeNode} objects are used for the first two levels of the tree
* (representing bytes a and b in the example sequence). The combinations of
* objects at these 2 levels are few so internally these objects allocate an
* array of 256 child node objects to quickly address children by indexing
* directly into the densely packed array using a byte value. The third level in
* the tree holds {@link LightweightTreeNode} nodes that have few children
* (typically much less than 256) and so use a dynamically-grown array to hold
* child nodes as simple int primitives. These ints represent the final 3 bytes
* of a sequence and also hold a count of the number of times the entire sequence
* path has been visited (count is a single byte).
* <p>
* The Trie grows indefinitely as more content is added and while theoretically
* it could be massive (a 6-depth tree could produce 256^6 nodes) non-random
* content e.g English text contains fewer variations.
* <p>
* In future we may look at using one of these strategies when memory is tight:
* <ol>
* <li>auto-pruning methods to remove less-visited parts of the tree
* <li>auto-reset to wipe the whole tree and restart when a memory threshold is
* reached
* <li>halting any growth of the tree
* </ol>
*
* Tests on real-world-text show that the size of the tree is a multiple of the
* input text where that multiplier varies between 10 and 5 times as the content
* size increased from 10 to 100 megabytes of content.
*
*/
public class DuplicateByteSequenceSpotter {
public static final int TREE_DEPTH = 6;
// The maximum number of repetitions that are counted
public static final int MAX_HIT_COUNT = 255;
private final TreeNode root;
private boolean sequenceBufferFilled = false;
private final byte[] sequenceBuffer = new byte[TREE_DEPTH];
private int nextFreePos = 0;
// ==Performance info
private final int[] nodesAllocatedByDepth;
private int nodesResizedByDepth;
// ==== RAM usage estimation settings ====
private long bytesAllocated;
// Root node object plus inner-class reference to containing "this"
// (profiler suggested this was a cost)
static final long TREE_NODE_OBJECT_SIZE = RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF;
// A TreeNode specialization with an array ref (dynamically allocated and
// fixed-size)
static final long ROOT_TREE_NODE_OBJECT_SIZE = TREE_NODE_OBJECT_SIZE + RamUsageEstimator.NUM_BYTES_OBJECT_REF;
// A KeyedTreeNode specialization with an array ref (dynamically allocated
// and grown)
static final long LIGHTWEIGHT_TREE_NODE_OBJECT_SIZE = TREE_NODE_OBJECT_SIZE + RamUsageEstimator.NUM_BYTES_OBJECT_REF;
// A KeyedTreeNode specialization with a short-based hit count and a
// sequence of bytes encoded as an int
static final long LEAF_NODE_OBJECT_SIZE = TREE_NODE_OBJECT_SIZE + Short.BYTES + Integer.BYTES;
public DuplicateByteSequenceSpotter() {
this.nodesAllocatedByDepth = new int[4];
this.bytesAllocated = 0;
root = new RootTreeNode((byte) 1, null, 0);
}
/**
* Reset the sequence detection logic to avoid any continuation of the
* immediately previous bytes. A minimum of dupSequenceSize bytes need to be
* added before any new duplicate sequences will be reported.
* Hit counts are not reset by calling this method.
*/
public void startNewSequence() {
sequenceBufferFilled = false;
nextFreePos = 0;
}
/**
* Add a byte to the sequence.
* @param b
* the next byte in a sequence
* @return number of times this byte and the preceding 6 bytes have been
* seen before as a sequence (only counts up to 255)
*
*/
public short addByte(byte b) {
// Add latest byte to circular buffer
sequenceBuffer[nextFreePos] = b;
nextFreePos++;
if (nextFreePos >= sequenceBuffer.length) {
nextFreePos = 0;
sequenceBufferFilled = true;
}
if (sequenceBufferFilled == false) {
return 0;
}
TreeNode node = root;
// replay updated sequence of bytes represented in the circular
// buffer starting from the tail
int p = nextFreePos;
// The first tier of nodes are addressed using individual bytes from the
// sequence
node = node.add(sequenceBuffer[p], 0);
p = nextBufferPos(p);
node = node.add(sequenceBuffer[p], 1);
p = nextBufferPos(p);
node = node.add(sequenceBuffer[p], 2);
// The final 3 bytes in the sequence are represented in an int
// where the 4th byte will contain a hit count.
p = nextBufferPos(p);
int sequence = 0xFF & sequenceBuffer[p];
p = nextBufferPos(p);
sequence = sequence << 8 | (0xFF & sequenceBuffer[p]);
p = nextBufferPos(p);
sequence = sequence << 8 | (0xFF & sequenceBuffer[p]);
return (short) (node.add(sequence << 8) - 1);
}
private int nextBufferPos(int p) {
p++;
if (p >= sequenceBuffer.length) {
p = 0;
}
return p;
}
/**
* Base class for nodes in the tree. Subclasses are optimised for use at
* different locations in the tree - speed-optimized nodes represent
* branches near the root while space-optimized nodes are used for deeper
* leaves/branches.
*/
abstract class TreeNode {
TreeNode(byte key, TreeNode parentNode, int depth) {
nodesAllocatedByDepth[depth]++;
}
public abstract TreeNode add(byte b, int depth);
/**
*
* @param byteSequence
* a sequence of bytes encoded as an int
* @return the number of times the full sequence has been seen (counting
* up to a maximum of 32767).
*/
public abstract short add(int byteSequence);
}
// Node implementation for use at the root of the tree that sacrifices space
// for speed.
class RootTreeNode extends TreeNode {
// A null-or-256 sized array that can be indexed into using a byte for
// fast access.
// Being near the root of the tree it is expected that this is a
// non-sparse array.
TreeNode[] children;
RootTreeNode(byte key, TreeNode parentNode, int depth) {
super(key, parentNode, depth);
bytesAllocated += ROOT_TREE_NODE_OBJECT_SIZE;
}
public TreeNode add(byte b, int depth) {
if (children == null) {
children = new TreeNode[256];
bytesAllocated += (RamUsageEstimator.NUM_BYTES_OBJECT_REF * 256);
}
int bIndex = 0xFF & b;
TreeNode node = children[bIndex];
if (node == null) {
if (depth <= 1) {
// Depths 0 and 1 use RootTreeNode impl and create
// RootTreeNodeImpl children
node = new RootTreeNode(b, this, depth);
} else {
// Deeper-level nodes are less visited but more numerous
// so use a more space-friendly data structure
node = new LightweightTreeNode(b, this, depth);
}
children[bIndex] = node;
}
return node;
}
@Override
public short add(int byteSequence) {
throw new UnsupportedOperationException("Root nodes do not support byte sequences encoded as integers");
}
}
// Node implementation for use by the depth 3 branches of the tree that
// sacrifices speed for space.
final class LightweightTreeNode extends TreeNode {
// An array dynamically resized but frequently only sized 1 as most
// sequences leading to end leaves are one-off paths.
// It is scanned for matches sequentially and benchmarks showed
// that sorting contents on insertion didn't improve performance.
int[] children = null;
LightweightTreeNode(byte key, TreeNode parentNode, int depth) {
super(key, parentNode, depth);
bytesAllocated += LIGHTWEIGHT_TREE_NODE_OBJECT_SIZE;
}
@Override
public short add(int byteSequence) {
if (children == null) {
// Create array adding new child with the byte sequence combined with hitcount of 1.
// Most nodes at this level we expect to have only 1 child so we start with the
// smallest possible child array.
children = new int[1];
bytesAllocated += RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + Integer.BYTES;
children[0] = byteSequence + 1;
return 1;
}
// Find existing child and if discovered increment count
for (int i = 0; i < children.length; i++) {
int child = children[i];
if (byteSequence == (child & 0xFFFFFF00)) {
int hitCount = child & 0xFF;
if (hitCount < MAX_HIT_COUNT) {
children[i]++;
}
return (short) (hitCount + 1);
}
}
// Grow array adding new child
int[] newChildren = new int[children.length + 1];
bytesAllocated += Integer.BYTES;
System.arraycopy(children, 0, newChildren, 0, children.length);
children = newChildren;
// Combine the byte sequence with a hit count of 1 into an int.
children[newChildren.length - 1] = byteSequence + 1;
nodesResizedByDepth++;
return 1;
}
@Override
public TreeNode add(byte b, int depth) {
throw new UnsupportedOperationException("Leaf nodes do not take byte sequences");
}
}
public final long getEstimatedSizeInBytes() {
return bytesAllocated;
}
/**
* @return Performance info - the number of nodes allocated at each depth
*/
public int[] getNodesAllocatedByDepth() {
return nodesAllocatedByDepth.clone();
}
/**
* @return Performance info - the number of resizing of children arrays, at
* each depth
*/
public int getNodesResizedByDepth() {
return nodesResizedByDepth;
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.analysis.miscellaneous;
import org.apache.lucene.util.Attribute;
/**
* Provides statistics useful for detecting duplicate sections of text
*/
public interface DuplicateSequenceAttribute extends Attribute {
/**
* @return The number of times this token has been seen previously as part
* of a sequence (counts to a max of 255)
*/
short getNumPriorUsesInASequence();
void setNumPriorUsesInASequence(short len);
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.analysis.miscellaneous;
import org.apache.lucene.util.AttributeImpl;
import org.apache.lucene.util.AttributeReflector;
public class DuplicateSequenceAttributeImpl extends AttributeImpl implements DuplicateSequenceAttribute {
protected short numPriorUsesInASequence = 0;
@Override
public void clear() {
numPriorUsesInASequence = 0;
}
@Override
public void copyTo(AttributeImpl target) {
DuplicateSequenceAttributeImpl t = (DuplicateSequenceAttributeImpl) target;
t.numPriorUsesInASequence = numPriorUsesInASequence;
}
@Override
public short getNumPriorUsesInASequence() {
return numPriorUsesInASequence;
}
@Override
public void setNumPriorUsesInASequence(short len) {
numPriorUsesInASequence = len;
}
@Override
public void reflectWith(AttributeReflector reflector) {
reflector.reflect(DuplicateSequenceAttribute.class, "sequenceLength", numPriorUsesInASequence);
}
}

View File

@ -132,6 +132,7 @@ import org.elasticsearch.search.aggregations.bucket.sampler.UnmappedSampler;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantLongTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantStringTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.UnmappedSignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.ChiSquare;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.GND;
@ -377,6 +378,8 @@ public class SearchModule {
.addResultReader(SignificantStringTerms.NAME, SignificantStringTerms::new)
.addResultReader(SignificantLongTerms.NAME, SignificantLongTerms::new)
.addResultReader(UnmappedSignificantTerms.NAME, UnmappedSignificantTerms::new));
registerAggregation(new AggregationSpec(SignificantTextAggregationBuilder.NAME, SignificantTextAggregationBuilder::new,
SignificantTextAggregationBuilder.getParser(significanceHeuristicParserRegistry)));
registerAggregation(new AggregationSpec(RangeAggregationBuilder.NAME, RangeAggregationBuilder::new,
RangeAggregationBuilder::parse).addResultReader(InternalRange::new));
registerAggregation(new AggregationSpec(DateRangeAggregationBuilder.NAME, DateRangeAggregationBuilder::new,

View File

@ -51,6 +51,7 @@ import org.elasticsearch.search.aggregations.bucket.sampler.Sampler;
import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
@ -246,6 +247,15 @@ public class AggregationBuilders {
return new SignificantTermsAggregationBuilder(name, null);
}
/**
* Create a new {@link SignificantTextAggregationBuilder} aggregation with the given name and text field name
*/
public static SignificantTextAggregationBuilder significantText(String name, String fieldName) {
return new SignificantTextAggregationBuilder(name, fieldName);
}
/**
* Create a new {@link DateHistogramAggregationBuilder} aggregation with the given
* name.

View File

@ -55,7 +55,7 @@ public abstract class AggregatorBase extends Aggregator {
private DeferringBucketCollector recordingWrapper;
private final List<PipelineAggregator> pipelineAggregators;
private final CircuitBreakerService breakerService;
private boolean failed = false;
private long requestBytesUsed;
/**
* Constructs a new Aggregator.
@ -105,16 +105,31 @@ public abstract class AggregatorBase extends Aggregator {
return false; // unreachable
}
};
addRequestCircuitBreakerBytes(DEFAULT_WEIGHT);
}
/**
* Increment the number of bytes that have been allocated to service this request
* and potentially trigger a {@link CircuitBreakingException}. The number of bytes
* allocated is automatically decremented with the circuit breaker service on
* closure of this aggregator.
* For performance reasons subclasses should not call this millions of times
* each with small increments and instead batch up into larger allocations.
*
* @param bytesAllocated the number of additional bytes allocated
* @return the cumulative size in bytes allocated by this aggregator to service this request
*/
protected long addRequestCircuitBreakerBytes(long bytesAllocated) {
try {
this.breakerService
.getBreaker(CircuitBreaker.REQUEST)
.addEstimateBytesAndMaybeBreak(DEFAULT_WEIGHT, "<agg [" + name + "]>");
.addEstimateBytesAndMaybeBreak(bytesAllocated, "<agg [" + name + "]>");
this.requestBytesUsed += bytesAllocated;
return requestBytesUsed;
} catch (CircuitBreakingException cbe) {
this.failed = true;
throw cbe;
}
}
/**
* Most aggregators don't need scores, make sure to extend this method if
* your aggregator needs them.
@ -265,9 +280,7 @@ public abstract class AggregatorBase extends Aggregator {
try {
doClose();
} finally {
if (!this.failed) {
this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-DEFAULT_WEIGHT);
}
this.breakerService.getBreaker(CircuitBreaker.REQUEST).addWithoutBreaking(-this.requestBytesUsed);
}
}

View File

@ -0,0 +1,386 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseFieldRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class SignificantTextAggregationBuilder extends AbstractAggregationBuilder<SignificantTextAggregationBuilder> {
public static final String NAME = "significant_text";
static final ParseField FIELD_NAME = new ParseField("field");
static final ParseField SOURCE_FIELDS_NAME = new ParseField("source_fields");
static final ParseField FILTER_DUPLICATE_TEXT_FIELD_NAME = new ParseField(
"filter_duplicate_text");
static final TermsAggregator.BucketCountThresholds DEFAULT_BUCKET_COUNT_THRESHOLDS =
SignificantTermsAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS;
static final SignificanceHeuristic DEFAULT_SIGNIFICANCE_HEURISTIC = SignificantTermsAggregationBuilder.DEFAULT_SIGNIFICANCE_HEURISTIC;
private String fieldName = null;
private String [] sourceFieldNames = null;
private boolean filterDuplicateText = false;
private IncludeExclude includeExclude = null;
private QueryBuilder filterBuilder = null;
private TermsAggregator.BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(
DEFAULT_BUCKET_COUNT_THRESHOLDS);
private SignificanceHeuristic significanceHeuristic = DEFAULT_SIGNIFICANCE_HEURISTIC;
public static Aggregator.Parser getParser(
ParseFieldRegistry<SignificanceHeuristicParser> significanceHeuristicParserRegistry) {
ObjectParser<SignificantTextAggregationBuilder, QueryParseContext> parser = new ObjectParser<>(
SignificantTextAggregationBuilder.NAME);
parser.declareInt(SignificantTextAggregationBuilder::shardSize,
TermsAggregationBuilder.SHARD_SIZE_FIELD_NAME);
parser.declareLong(SignificantTextAggregationBuilder::minDocCount,
TermsAggregationBuilder.MIN_DOC_COUNT_FIELD_NAME);
parser.declareLong(SignificantTextAggregationBuilder::shardMinDocCount,
TermsAggregationBuilder.SHARD_MIN_DOC_COUNT_FIELD_NAME);
parser.declareInt(SignificantTextAggregationBuilder::size,
TermsAggregationBuilder.REQUIRED_SIZE_FIELD_NAME);
parser.declareString(SignificantTextAggregationBuilder::fieldName, FIELD_NAME);
parser.declareStringArray(SignificantTextAggregationBuilder::sourceFieldNames, SOURCE_FIELDS_NAME);
parser.declareBoolean(SignificantTextAggregationBuilder::filterDuplicateText,
FILTER_DUPLICATE_TEXT_FIELD_NAME);
parser.declareObject(SignificantTextAggregationBuilder::backgroundFilter,
(p, context) -> context.parseInnerQueryBuilder(),
SignificantTermsAggregationBuilder.BACKGROUND_FILTER);
parser.declareField((b, v) -> b.includeExclude(IncludeExclude.merge(v, b.includeExclude())),
IncludeExclude::parseInclude, IncludeExclude.INCLUDE_FIELD,
ObjectParser.ValueType.OBJECT_ARRAY_OR_STRING);
parser.declareField((b, v) -> b.includeExclude(IncludeExclude.merge(b.includeExclude(), v)),
IncludeExclude::parseExclude, IncludeExclude.EXCLUDE_FIELD,
ObjectParser.ValueType.STRING_ARRAY);
for (String name : significanceHeuristicParserRegistry.getNames()) {
parser.declareObject(SignificantTextAggregationBuilder::significanceHeuristic,
(p, context) -> {
SignificanceHeuristicParser significanceHeuristicParser = significanceHeuristicParserRegistry
.lookupReturningNullIfNotFound(name);
return significanceHeuristicParser.parse(context);
}, new ParseField(name));
}
return new Aggregator.Parser() {
@Override
public AggregationBuilder parse(String aggregationName, QueryParseContext context)
throws IOException {
return parser.parse(context.parser(),
new SignificantTextAggregationBuilder(aggregationName, null), context);
}
};
}
protected TermsAggregator.BucketCountThresholds getBucketCountThresholds() {
return new TermsAggregator.BucketCountThresholds(bucketCountThresholds);
}
public TermsAggregator.BucketCountThresholds bucketCountThresholds() {
return bucketCountThresholds;
}
@Override
public SignificantTextAggregationBuilder subAggregations(Builder subFactories) {
throw new AggregationInitializationException("Aggregator [" + name + "] of type ["
+ getType() + "] cannot accept sub-aggregations");
}
@Override
public SignificantTextAggregationBuilder subAggregation(AggregationBuilder aggregation) {
throw new AggregationInitializationException("Aggregator [" + name + "] of type ["
+ getType() + "] cannot accept sub-aggregations");
}
public SignificantTextAggregationBuilder bucketCountThresholds(
TermsAggregator.BucketCountThresholds bucketCountThresholds) {
if (bucketCountThresholds == null) {
throw new IllegalArgumentException(
"[bucketCountThresholds] must not be null: [" + name + "]");
}
this.bucketCountThresholds = bucketCountThresholds;
return this;
}
/**
* Sets the size - indicating how many term buckets should be returned
* (defaults to 10)
*/
public SignificantTextAggregationBuilder size(int size) {
if (size <= 0) {
throw new IllegalArgumentException(
"[size] must be greater than 0. Found [" + size + "] in [" + name + "]");
}
bucketCountThresholds.setRequiredSize(size);
return this;
}
/**
* Sets the shard_size - indicating the number of term buckets each shard
* will return to the coordinating node (the node that coordinates the
* search execution). The higher the shard size is, the more accurate the
* results are.
*/
public SignificantTextAggregationBuilder shardSize(int shardSize) {
if (shardSize <= 0) {
throw new IllegalArgumentException("[shardSize] must be greater than 0. Found ["
+ shardSize + "] in [" + name + "]");
}
bucketCountThresholds.setShardSize(shardSize);
return this;
}
/**
* Sets the name of the text field that will be the subject of this
* aggregation.
*/
public SignificantTextAggregationBuilder fieldName(String fieldName) {
this.fieldName = fieldName;
return this;
}
/**
* Selects the fields to load from _source JSON and analyze.
* If none are specified, the indexed "fieldName" value is assumed
* to also be the name of the JSON field holding the value
*/
public SignificantTextAggregationBuilder sourceFieldNames(List<String> names) {
this.sourceFieldNames = names.toArray(new String [names.size()]);
return this;
}
/**
* Control if duplicate paragraphs of text should try be filtered from the
* statistical text analysis. Can improve results but slows down analysis.
* Default is false.
*/
public SignificantTextAggregationBuilder filterDuplicateText(boolean filterDuplicateText) {
this.filterDuplicateText = filterDuplicateText;
return this;
}
/**
* Set the minimum document count terms should have in order to appear in
* the response.
*/
public SignificantTextAggregationBuilder minDocCount(long minDocCount) {
if (minDocCount < 0) {
throw new IllegalArgumentException(
"[minDocCount] must be greater than or equal to 0. Found [" + minDocCount
+ "] in [" + name + "]");
}
bucketCountThresholds.setMinDocCount(minDocCount);
return this;
}
/**
* Set the minimum document count terms should have on the shard in order to
* appear in the response.
*/
public SignificantTextAggregationBuilder shardMinDocCount(long shardMinDocCount) {
if (shardMinDocCount < 0) {
throw new IllegalArgumentException(
"[shardMinDocCount] must be greater than or equal to 0. Found ["
+ shardMinDocCount + "] in [" + name + "]");
}
bucketCountThresholds.setShardMinDocCount(shardMinDocCount);
return this;
}
public SignificantTextAggregationBuilder backgroundFilter(QueryBuilder backgroundFilter) {
if (backgroundFilter == null) {
throw new IllegalArgumentException(
"[backgroundFilter] must not be null: [" + name + "]");
}
this.filterBuilder = backgroundFilter;
return this;
}
public QueryBuilder backgroundFilter() {
return filterBuilder;
}
/**
* Set terms to include and exclude from the aggregation results
*/
public SignificantTextAggregationBuilder includeExclude(IncludeExclude includeExclude) {
this.includeExclude = includeExclude;
return this;
}
/**
* Get terms to include and exclude from the aggregation results
*/
public IncludeExclude includeExclude() {
return includeExclude;
}
public SignificantTextAggregationBuilder significanceHeuristic(
SignificanceHeuristic significanceHeuristic) {
if (significanceHeuristic == null) {
throw new IllegalArgumentException(
"[significanceHeuristic] must not be null: [" + name + "]");
}
this.significanceHeuristic = significanceHeuristic;
return this;
}
public SignificanceHeuristic significanceHeuristic() {
return significanceHeuristic;
}
/**
* @param name
* the name of this aggregation
* @param fieldName
* the name of the text field that will be the subject of this
* aggregation
*
*/
public SignificantTextAggregationBuilder(String name, String fieldName) {
super(name);
this.fieldName = fieldName;
}
/**
* Read from a stream.
*/
public SignificantTextAggregationBuilder(StreamInput in) throws IOException {
super(in);
fieldName = in.readString();
filterDuplicateText = in.readBoolean();
bucketCountThresholds = new BucketCountThresholds(in);
filterBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
includeExclude = in.readOptionalWriteable(IncludeExclude::new);
significanceHeuristic = in.readNamedWriteable(SignificanceHeuristic.class);
sourceFieldNames = in.readOptionalStringArray();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeString(fieldName);
out.writeBoolean(filterDuplicateText);
bucketCountThresholds.writeTo(out);
out.writeOptionalNamedWriteable(filterBuilder);
out.writeOptionalWriteable(includeExclude);
out.writeNamedWriteable(significanceHeuristic);
out.writeOptionalStringArray(sourceFieldNames);
}
@Override
protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<?> parent,
Builder subFactoriesBuilder) throws IOException {
SignificanceHeuristic executionHeuristic = this.significanceHeuristic.rewrite(context);
String[] execFieldNames = sourceFieldNames;
if (execFieldNames == null) {
execFieldNames = new String[] { fieldName };
}
return new SignificantTextAggregatorFactory(name, includeExclude, filterBuilder,
bucketCountThresholds, executionHeuristic, context, parent, subFactoriesBuilder,
fieldName, execFieldNames, filterDuplicateText, metaData);
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params)
throws IOException {
builder.startObject();
bucketCountThresholds.toXContent(builder, params);
if (fieldName != null) {
builder.field(FIELD_NAME.getPreferredName(), fieldName);
}
if (sourceFieldNames != null) {
builder.array(SOURCE_FIELDS_NAME.getPreferredName(), sourceFieldNames);
}
if (filterDuplicateText) {
builder.field(FILTER_DUPLICATE_TEXT_FIELD_NAME.getPreferredName(), filterDuplicateText);
}
if (filterBuilder != null) {
builder.field(SignificantTermsAggregationBuilder.BACKGROUND_FILTER.getPreferredName(),
filterBuilder);
}
if (includeExclude != null) {
includeExclude.toXContent(builder, params);
}
significanceHeuristic.toXContent(builder, params);
builder.endObject();
return builder;
}
@Override
protected int doHashCode() {
return Objects.hash(bucketCountThresholds, fieldName, filterDuplicateText, filterBuilder,
includeExclude, significanceHeuristic, Arrays.hashCode(sourceFieldNames));
}
@Override
protected boolean doEquals(Object obj) {
SignificantTextAggregationBuilder other = (SignificantTextAggregationBuilder) obj;
return Objects.equals(bucketCountThresholds, other.bucketCountThresholds)
&& Objects.equals(fieldName, other.fieldName)
&& Arrays.equals(sourceFieldNames, other.sourceFieldNames)
&& filterDuplicateText == other.filterDuplicateText
&& Objects.equals(filterBuilder, other.filterBuilder)
&& Objects.equals(includeExclude, other.includeExclude)
&& Objects.equals(significanceHeuristic, other.significanceHeuristic);
}
@Override
public String getType() {
return NAME;
}
}

View File

@ -0,0 +1,256 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.miscellaneous.DeDuplicatingTokenFilter;
import org.apache.lucene.analysis.miscellaneous.DuplicateByteSequenceSpotter;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.highlight.TokenStreamFromTermVector;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude.StringFilter;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
public class SignificantTextAggregator extends BucketsAggregator {
private final StringFilter includeExclude;
protected final BucketCountThresholds bucketCountThresholds;
protected long numCollectedDocs;
private final BytesRefHash bucketOrds;
private final SignificanceHeuristic significanceHeuristic;
private SignificantTextAggregatorFactory termsAggFactory;
private final DocValueFormat format = DocValueFormat.RAW;
private final String fieldName;
private final String[] sourceFieldNames;
private DuplicateByteSequenceSpotter dupSequenceSpotter = null ;
private long lastTrieSize;
private static final int MEMORY_GROWTH_REPORTING_INTERVAL_BYTES = 5000;
public SignificantTextAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
BucketCountThresholds bucketCountThresholds, IncludeExclude.StringFilter includeExclude,
SignificanceHeuristic significanceHeuristic, SignificantTextAggregatorFactory termsAggFactory,
String fieldName, String [] sourceFieldNames, boolean filterDuplicateText,
Map<String, Object> metaData) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
this.bucketCountThresholds = bucketCountThresholds;
this.includeExclude = includeExclude;
this.significanceHeuristic = significanceHeuristic;
this.termsAggFactory = termsAggFactory;
this.fieldName = fieldName;
this.sourceFieldNames = sourceFieldNames;
bucketOrds = new BytesRefHash(1, context.bigArrays());
if(filterDuplicateText){
dupSequenceSpotter = new DuplicateByteSequenceSpotter();
lastTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
}
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
final BytesRefBuilder previous = new BytesRefBuilder();
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
collectFromSource(doc, bucket, fieldName, sourceFieldNames);
numCollectedDocs++;
if (dupSequenceSpotter != null) {
dupSequenceSpotter.startNewSequence();
}
}
private void processTokenStream(int doc, long bucket, TokenStream ts, String fieldText) throws IOException{
if (dupSequenceSpotter != null) {
ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter);
}
CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
ts.reset();
try {
//Assume tokens will average 5 bytes in length to size number of tokens
BytesRefHash inDocTerms = new BytesRefHash(1+(fieldText.length()/5), context.bigArrays());
try{
while (ts.incrementToken()) {
if (dupSequenceSpotter != null) {
long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
long growth = newTrieSize - lastTrieSize;
// Only update the circuitbreaker after
if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
addRequestCircuitBreakerBytes(growth);
lastTrieSize = newTrieSize;
}
}
previous.clear();
previous.copyChars(termAtt);
BytesRef bytes = previous.get();
if (inDocTerms.add(bytes) >= 0) {
if (includeExclude == null || includeExclude.accept(bytes)) {
long bucketOrdinal = bucketOrds.add(bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}
}
}
} finally{
Releasables.close(inDocTerms);
}
} finally{
ts.close();
}
}
private void collectFromSource(int doc, long bucket, String indexedFieldName, String[] sourceFieldNames) throws IOException {
MappedFieldType fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
if(fieldType == null){
throw new IllegalArgumentException("Aggregation [" + name + "] cannot process field ["+indexedFieldName
+"] since it is not present");
}
SourceLookup sourceLookup = context.lookup().source();
sourceLookup.setSegmentAndDocument(ctx, doc);
for (String sourceField : sourceFieldNames) {
List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);
textsToHighlight = textsToHighlight.stream().map(obj -> {
if (obj instanceof BytesRef) {
return fieldType.valueForDisplay(obj).toString();
} else {
return obj;
}
}).collect(Collectors.toList());
Analyzer analyzer = fieldType.indexAnalyzer();
for (Object fieldValue : textsToHighlight) {
String fieldText = fieldValue.toString();
TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
processTokenStream(doc, bucket, ts, fieldText);
}
}
}
};
}
@Override
public SignificantStringTerms buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
long supersetSize = termsAggFactory.getSupersetNumDocs();
long subsetSize = numCollectedDocs;
BucketSignificancePriorityQueue<SignificantStringTerms.Bucket> ordered = new BucketSignificancePriorityQueue<>(size);
SignificantStringTerms.Bucket spare = null;
for (int i = 0; i < bucketOrds.size(); i++) {
final int docCount = bucketDocCount(i);
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format);
}
bucketOrds.get(i, spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes);
spare.supersetSize = supersetSize;
// During shard-local down-selection we use subset/superset stats
// that are for this shard only
// Back at the central reducer these properties will be updated with
// global stats
spare.updateScore(significanceHeuristic);
spare.bucketOrd = i;
spare = ordered.insertWithOverflow(spare);
}
final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
final SignificantStringTerms.Bucket bucket = ordered.pop();
// the terms are owned by the BytesRefHash, we need to pull a copy since the BytesRef hash data may be recycled at some point
bucket.termBytes = BytesRef.deepCopyOf(bucket.termBytes);
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
return new SignificantStringTerms( name, bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(), pipelineAggregators(),
metaData(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list));
}
@Override
public SignificantStringTerms buildEmptyAggregation() {
// We need to account for the significance of a miss in our global stats - provide corpus size as context
ContextIndexSearcher searcher = context.searcher();
IndexReader topReader = searcher.getIndexReader();
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
pipelineAggregators(), metaData(), format, 0, supersetSize, significanceHeuristic, emptyList());
}
@Override
public void doClose() {
Releasables.close(bucketOrds, termsAggFactory);
}
}

View File

@ -0,0 +1,187 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.index.FilterableTermsEnum;
import org.elasticsearch.common.lucene.index.FreqTermsEnum;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.BucketUtils;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class SignificantTextAggregatorFactory extends AggregatorFactory<SignificantTextAggregatorFactory>
implements Releasable {
private final IncludeExclude includeExclude;
private String indexedFieldName;
private MappedFieldType fieldType;
private final String[] sourceFieldNames;
private FilterableTermsEnum termsEnum;
private int numberOfAggregatorsCreated;
private final Query filter;
private final int supersetNumDocs;
private final TermsAggregator.BucketCountThresholds bucketCountThresholds;
private final SignificanceHeuristic significanceHeuristic;
private final DocValueFormat format = DocValueFormat.RAW;
private final boolean filterDuplicateText;
public SignificantTextAggregatorFactory(String name, IncludeExclude includeExclude,
QueryBuilder filterBuilder, TermsAggregator.BucketCountThresholds bucketCountThresholds,
SignificanceHeuristic significanceHeuristic, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, String fieldName, String [] sourceFieldNames,
boolean filterDuplicateText, Map<String, Object> metaData) throws IOException {
super(name, context, parent, subFactoriesBuilder, metaData);
this.includeExclude = includeExclude;
this.filter = filterBuilder == null
? null
: filterBuilder.toQuery(context.getQueryShardContext());
this.indexedFieldName = fieldName;
this.sourceFieldNames = sourceFieldNames;
this.filterDuplicateText = filterDuplicateText;
IndexSearcher searcher = context.searcher();
// Important - need to use the doc count that includes deleted docs
// or we have this issue: https://github.com/elastic/elasticsearch/issues/7951
this.supersetNumDocs = filter == null
? searcher.getIndexReader().maxDoc()
: searcher.count(filter);
this.bucketCountThresholds = bucketCountThresholds;
this.significanceHeuristic = significanceHeuristic;
fieldType = context.getQueryShardContext().fieldMapper(indexedFieldName);
}
/**
* Get the number of docs in the superset.
*/
public long getSupersetNumDocs() {
return supersetNumDocs;
}
private FilterableTermsEnum getTermsEnum(String field) throws IOException {
if (termsEnum != null) {
return termsEnum;
}
IndexReader reader = context.searcher().getIndexReader();
if (numberOfAggregatorsCreated > 1) {
termsEnum = new FreqTermsEnum(reader, field, true, false, filter, context.bigArrays());
} else {
termsEnum = new FilterableTermsEnum(reader, indexedFieldName, PostingsEnum.NONE, filter);
}
return termsEnum;
}
private long getBackgroundFrequency(String value) throws IOException {
Query query = fieldType.termQuery(value, context.getQueryShardContext());
if (query instanceof TermQuery) {
// for types that use the inverted index, we prefer using a caching terms
// enum that will do a better job at reusing index inputs
Term term = ((TermQuery) query).getTerm();
FilterableTermsEnum termsEnum = getTermsEnum(term.field());
if (termsEnum.seekExact(term.bytes())) {
return termsEnum.docFreq();
} else {
return 0;
}
}
// otherwise do it the naive way
if (filter != null) {
query = new BooleanQuery.Builder()
.add(query, Occur.FILTER)
.add(filter, Occur.FILTER)
.build();
}
return context.searcher().count(query);
}
public long getBackgroundFrequency(BytesRef termBytes) throws IOException {
String value = format.format(termBytes);
return getBackgroundFrequency(value);
}
@Override
public void close() {
try {
if (termsEnum instanceof Releasable) {
((Releasable) termsEnum).close();
}
} finally {
termsEnum = null;
}
}
@Override
protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
numberOfAggregatorsCreated++;
BucketCountThresholds bucketCountThresholds = new BucketCountThresholds(this.bucketCountThresholds);
if (bucketCountThresholds.getShardSize() == SignificantTextAggregationBuilder.DEFAULT_BUCKET_COUNT_THRESHOLDS.getShardSize()) {
// The user has not made a shardSize selection.
// Use default heuristic to avoid any wrong-ranking caused by
// distributed counting but request double the usual amount.
// We typically need more than the number of "top" terms requested
// by other aggregations as the significance algorithm is in less
// of a position to down-select at shard-level - some of the things
// we want to find have only one occurrence on each shard and as
// such are impossible to differentiate from non-significant terms
// at that early stage.
bucketCountThresholds.setShardSize(2 * BucketUtils.suggestShardSideQueueSize(bucketCountThresholds.getRequiredSize(),
context.numberOfShards()));
}
// TODO - need to check with mapping that this is indeed a text field....
IncludeExclude.StringFilter incExcFilter = includeExclude == null ? null:
includeExclude.convertToStringFilter(DocValueFormat.RAW);
return new SignificantTextAggregator(name, factories, context, parent, pipelineAggregators, bucketCountThresholds,
incExcFilter, significanceHeuristic, this, indexedFieldName, sourceFieldNames, filterDuplicateText, metaData);
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.analysis.miscellaneous;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.MockTokenizer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class DeDuplicatingTokenFilterTests extends ESTestCase {
public void testSimple() throws IOException {
DuplicateByteSequenceSpotter bytesDeDuper = new DuplicateByteSequenceSpotter();
Analyzer analyzer = new Analyzer() {
@Override
protected TokenStreamComponents createComponents(String fieldName) {
Tokenizer t = new MockTokenizer(MockTokenizer.WHITESPACE, false);
return new TokenStreamComponents(t, new DeDuplicatingTokenFilter(t, bytesDeDuper));
}
};
String input = "a b c 1 2 3 4 5 6 7 a b c d 1 2 3 4 5 6 7 e f 1 2 3 4 5 6 7";
String expectedOutput = "a b c 1 2 3 4 5 6 7 a b c d e f";
TokenStream test = analyzer.tokenStream("test", input);
CharTermAttribute termAttribute = test.addAttribute(CharTermAttribute.class);
test.reset();
StringBuilder sb = new StringBuilder();
while (test.incrementToken()) {
sb.append(termAttribute.toString());
sb.append(" ");
}
String output = sb.toString().trim();
assertThat(output, equalTo(expectedOutput));
}
public void testHitCountLimits() throws IOException {
DuplicateByteSequenceSpotter bytesDeDuper = new DuplicateByteSequenceSpotter();
long peakMemoryUsed = 0;
for (int i = 0; i < DuplicateByteSequenceSpotter.MAX_HIT_COUNT * 2; i++) {
Analyzer analyzer = new Analyzer() {
@Override
protected TokenStreamComponents createComponents(String fieldName) {
Tokenizer t = new MockTokenizer(MockTokenizer.WHITESPACE, false);
return new TokenStreamComponents(t, new DeDuplicatingTokenFilter(t, bytesDeDuper, true));
}
};
try {
String input = "1 2 3 4 5 6";
bytesDeDuper.startNewSequence();
TokenStream test = analyzer.tokenStream("test", input);
DuplicateSequenceAttribute dsa = test.addAttribute(DuplicateSequenceAttribute.class);
test.reset();
while (test.incrementToken()) {
assertEquals(Math.min(DuplicateByteSequenceSpotter.MAX_HIT_COUNT, i), dsa.getNumPriorUsesInASequence());
}
if (i == 0) {
peakMemoryUsed = bytesDeDuper.getEstimatedSizeInBytes();
} else {
// Given we are feeding the same content repeatedly the
// actual memory
// used by bytesDeDuper should not grow
assertEquals(peakMemoryUsed, bytesDeDuper.getEstimatedSizeInBytes());
}
} finally {
analyzer.close();
}
}
}
public void testTaggedFrequencies() throws IOException {
DuplicateByteSequenceSpotter bytesDeDuper = new DuplicateByteSequenceSpotter();
Analyzer analyzer = new Analyzer() {
@Override
protected TokenStreamComponents createComponents(String fieldName) {
Tokenizer t = new MockTokenizer(MockTokenizer.WHITESPACE, false);
return new TokenStreamComponents(t, new DeDuplicatingTokenFilter(t, bytesDeDuper, true));
}
};
try {
String input = "a b c 1 2 3 4 5 6 7 a b c d 1 2 3 4 5 6 7 e f 1 2 3 4 5 6 7";
short[] expectedFrequencies = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 2, 2, 2, 2, 2, 2, 2 };
TokenStream test = analyzer.tokenStream("test", input);
DuplicateSequenceAttribute seqAtt = test.addAttribute(DuplicateSequenceAttribute.class);
test.reset();
for (int i = 0; i < expectedFrequencies.length; i++) {
assertThat(test.incrementToken(), equalTo(true));
assertThat(seqAtt.getNumPriorUsesInASequence(), equalTo(expectedFrequencies[i]));
}
assertThat(test.incrementToken(), equalTo(false));
} finally {
analyzer.close();
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -68,6 +69,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantTerms;
import static org.elasticsearch.search.aggregations.AggregationBuilders.significantText;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
@ -102,7 +104,22 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
String type = randomBoolean() ? "text" : "long";
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
SharedSignificantTermsTestMethods.index01Docs(type, settings, this);
SearchResponse response = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
SearchRequestBuilder request;
if ("text".equals(type) && randomBoolean()) {
// Use significant_text on text fields but occasionally run with alternative of
// significant_terms on legacy fieldData=true too.
request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(
terms("class")
.field(CLASS_FIELD)
.subAggregation((significantText("sig_terms", TEXT_FIELD))
.significanceHeuristic(new SimpleHeuristic())
.minDocCount(1)
)
);
}else
{
request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(
terms("class")
.field(CLASS_FIELD)
@ -111,9 +128,10 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
.significanceHeuristic(new SimpleHeuristic())
.minDocCount(1)
)
)
.execute()
.actionGet();
);
}
SearchResponse response = request.execute().actionGet();
assertSearchResponse(response);
StringTerms classes = response.getAggregations().get("class");
assertThat(classes.getBuckets().size(), equalTo(2));
@ -135,18 +153,7 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
// we run the same test again but this time we do not call assertSearchResponse() before the assertions
// the reason is that this would trigger toXContent and we would like to check that this has no potential side effects
response = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(
terms("class")
.field(CLASS_FIELD)
.subAggregation((significantTerms("sig_terms"))
.field(TEXT_FIELD)
.significanceHeuristic(new SimpleHeuristic())
.minDocCount(1)
)
)
.execute()
.actionGet();
response = request.execute().actionGet();
classes = (StringTerms) response.getAggregations().get("class");
assertThat(classes.getBuckets().size(), equalTo(2));
@ -261,10 +268,23 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
String type = randomBoolean() ? "text" : "long";
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
SharedSignificantTermsTestMethods.index01Docs(type, settings, this);
SearchResponse response = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(terms("class").field(CLASS_FIELD).subAggregation(significantTerms("sig_terms").field(TEXT_FIELD)))
.execute()
.actionGet();
SearchRequestBuilder request;
if ("text".equals(type) && randomBoolean() ) {
// Use significant_text on text fields but occasionally run with alternative of
// significant_terms on legacy fieldData=true too.
request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(terms("class").field(CLASS_FIELD)
.subAggregation(significantText("sig_terms", TEXT_FIELD)));
} else {
request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(terms("class").field(CLASS_FIELD)
.subAggregation(significantTerms("sig_terms").field(TEXT_FIELD)));
}
SearchResponse response = request.execute().actionGet();
assertSearchResponse(response);
StringTerms classes = response.getAggregations().get("class");
assertThat(classes.getBuckets().size(), equalTo(2));
@ -347,25 +367,39 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
}
indexRandom(true, false, indexRequestBuilderList);
client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
SearchRequestBuilder request;
if (randomBoolean() ) {
request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(
terms("class")
.field(CLASS_FIELD)
.subAggregation(
significantTerms("sig_terms")
.field(TEXT_FIELD)
.minDocCount(1)))
.execute()
.actionGet();
.minDocCount(1)));
}else
{
request = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(
terms("class")
.field(CLASS_FIELD)
.subAggregation(
significantText("sig_terms", TEXT_FIELD)
.minDocCount(1)));
}
request.execute().actionGet();
}
public void testBackgroundVsSeparateSet() throws Exception {
String type = randomBoolean() ? "text" : "long";
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
SharedSignificantTermsTestMethods.index01Docs(type, settings, this);
testBackgroundVsSeparateSet(new MutualInformation(true, true), new MutualInformation(true, false));
testBackgroundVsSeparateSet(new ChiSquare(true, true), new ChiSquare(true, false));
testBackgroundVsSeparateSet(new GND(true), new GND(false));
testBackgroundVsSeparateSet(new MutualInformation(true, true), new MutualInformation(true, false), type);
testBackgroundVsSeparateSet(new ChiSquare(true, true), new ChiSquare(true, false), type);
testBackgroundVsSeparateSet(new GND(true), new GND(false), type);
}
// compute significance score by
@ -373,9 +407,23 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
// 2. filter buckets and set the background to the other class and set is_background false
// both should yield exact same result
public void testBackgroundVsSeparateSet(SignificanceHeuristic significanceHeuristicExpectingSuperset,
SignificanceHeuristic significanceHeuristicExpectingSeparateSets) throws Exception {
SignificanceHeuristic significanceHeuristicExpectingSeparateSets,
String type) throws Exception {
SearchResponse response1 = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
final boolean useSigText = randomBoolean() && type.equals("text");
SearchRequestBuilder request1;
if (useSigText) {
request1 = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(terms("class")
.field(CLASS_FIELD)
.subAggregation(
significantText("sig_terms", TEXT_FIELD)
.minDocCount(1)
.significanceHeuristic(
significanceHeuristicExpectingSuperset)));
}else
{
request1 = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(terms("class")
.field(CLASS_FIELD)
.subAggregation(
@ -383,11 +431,28 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
.field(TEXT_FIELD)
.minDocCount(1)
.significanceHeuristic(
significanceHeuristicExpectingSuperset)))
.execute()
.actionGet();
significanceHeuristicExpectingSuperset)));
}
SearchResponse response1 = request1.execute().actionGet();
assertSearchResponse(response1);
SearchResponse response2 = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
SearchRequestBuilder request2;
if (useSigText) {
request2 = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(filter("0", QueryBuilders.termQuery(CLASS_FIELD, "0"))
.subAggregation(significantText("sig_terms", TEXT_FIELD)
.minDocCount(1)
.backgroundFilter(QueryBuilders.termQuery(CLASS_FIELD, "1"))
.significanceHeuristic(significanceHeuristicExpectingSeparateSets)))
.addAggregation(filter("1", QueryBuilders.termQuery(CLASS_FIELD, "1"))
.subAggregation(significantText("sig_terms", TEXT_FIELD)
.minDocCount(1)
.backgroundFilter(QueryBuilders.termQuery(CLASS_FIELD, "0"))
.significanceHeuristic(significanceHeuristicExpectingSeparateSets)));
}else
{
request2 = client().prepareSearch(INDEX_NAME).setTypes(DOC_TYPE)
.addAggregation(filter("0", QueryBuilders.termQuery(CLASS_FIELD, "0"))
.subAggregation(significantTerms("sig_terms")
.field(TEXT_FIELD)
@ -399,9 +464,10 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
.field(TEXT_FIELD)
.minDocCount(1)
.backgroundFilter(QueryBuilders.termQuery(CLASS_FIELD, "0"))
.significanceHeuristic(significanceHeuristicExpectingSeparateSets)))
.execute()
.actionGet();
.significanceHeuristic(significanceHeuristicExpectingSeparateSets)));
}
SearchResponse response2 = request2.execute().actionGet();
StringTerms classes = response1.getAggregations().get("class");
@ -438,14 +504,24 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
public void testScoresEqualForPositiveAndNegative(SignificanceHeuristic heuristic) throws Exception {
//check that results for both classes are the same with exclude negatives = false and classes are routing ids
SearchResponse response = client().prepareSearch("test")
SearchRequestBuilder request;
if (randomBoolean()) {
request = client().prepareSearch("test")
.addAggregation(terms("class").field("class").subAggregation(significantTerms("mySignificantTerms")
.field("text")
.executionHint(randomExecutionHint())
.significanceHeuristic(heuristic)
.minDocCount(1).shardSize(1000).size(1000)))
.execute()
.actionGet();
.minDocCount(1).shardSize(1000).size(1000)));
}else
{
request = client().prepareSearch("test")
.addAggregation(terms("class").field("class").subAggregation(significantText("mySignificantTerms", "text")
.significanceHeuristic(heuristic)
.minDocCount(1).shardSize(1000).size(1000)));
}
SearchResponse response = request.execute().actionGet();
assertSearchResponse(response);
assertSearchResponse(response);
StringTerms classes = response.getAggregations().get("class");
assertThat(classes.getBuckets().size(), equalTo(2));
@ -499,17 +575,28 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
}
public void testScriptScore() throws ExecutionException, InterruptedException, IOException {
indexRandomFrequencies01(randomBoolean() ? "text" : "long");
String type = randomBoolean() ? "text" : "long";
indexRandomFrequencies01(type);
ScriptHeuristic scriptHeuristic = getScriptSignificanceHeuristic();
SearchResponse response = client().prepareSearch(INDEX_NAME)
SearchRequestBuilder request;
if ("text".equals(type) && randomBoolean()) {
request = client().prepareSearch(INDEX_NAME)
.addAggregation(terms("class").field(CLASS_FIELD)
.subAggregation(significantText("mySignificantTerms", TEXT_FIELD)
.significanceHeuristic(scriptHeuristic)
.minDocCount(1).shardSize(2).size(2)));
}else
{
request = client().prepareSearch(INDEX_NAME)
.addAggregation(terms("class").field(CLASS_FIELD)
.subAggregation(significantTerms("mySignificantTerms")
.field(TEXT_FIELD)
.executionHint(randomExecutionHint())
.significanceHeuristic(scriptHeuristic)
.minDocCount(1).shardSize(2).size(2)))
.execute()
.actionGet();
.minDocCount(1).shardSize(2).size(2)));
}
SearchResponse response = request.execute().actionGet();
assertSearchResponse(response);
for (Terms.Bucket classBucket : ((Terms) response.getAggregations().get("class")).getBuckets()) {
SignificantTerms sigTerms = classBucket.getAggregations().get("mySignificantTerms");
@ -577,8 +664,15 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
// Test that a request using a script does not get cached
ScriptHeuristic scriptHeuristic = getScriptSignificanceHeuristic();
SearchResponse r = client().prepareSearch("cache_test_idx").setSize(0)
boolean useSigText = randomBoolean();
SearchResponse r;
if (useSigText) {
r = client().prepareSearch("cache_test_idx").setSize(0)
.addAggregation(significantText("foo", "s").significanceHeuristic(scriptHeuristic)).get();
} else {
r = client().prepareSearch("cache_test_idx").setSize(0)
.addAggregation(significantTerms("foo").field("s").significanceHeuristic(scriptHeuristic)).get();
}
assertSearchResponse(r);
assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache()
@ -588,7 +682,11 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
// To make sure that the cache is working test that a request not using
// a script is cached
if (useSigText) {
r = client().prepareSearch("cache_test_idx").setSize(0).addAggregation(significantText("foo", "s")).get();
} else {
r = client().prepareSearch("cache_test_idx").setSize(0).addAggregation(significantTerms("foo").field("s")).get();
}
assertSearchResponse(r);
assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache()
@ -597,4 +695,6 @@ public class SignificantTermsSignificanceScoreIT extends ESIntegTestCase {
.getMissCount(), equalTo(1L));
}
}

View File

@ -103,6 +103,47 @@ public class SignificantTermsTests extends BaseAggregationTestCase<SignificantTe
factory.format("###.##");
}
if (randomBoolean()) {
IncludeExclude incExc = getIncludeExclude();
factory.includeExclude(incExc);
}
if (randomBoolean()) {
SignificanceHeuristic significanceHeuristic = getSignificanceHeuristic();
factory.significanceHeuristic(significanceHeuristic);
}
if (randomBoolean()) {
factory.backgroundFilter(QueryBuilders.termsQuery("foo", "bar"));
}
return factory;
}
static SignificanceHeuristic getSignificanceHeuristic() {
SignificanceHeuristic significanceHeuristic = null;
switch (randomInt(5)) {
case 0:
significanceHeuristic = new PercentageScore();
break;
case 1:
significanceHeuristic = new ChiSquare(randomBoolean(), randomBoolean());
break;
case 2:
significanceHeuristic = new GND(randomBoolean());
break;
case 3:
significanceHeuristic = new MutualInformation(randomBoolean(), randomBoolean());
break;
case 4:
significanceHeuristic = new ScriptHeuristic(mockScript("foo"));
break;
case 5:
significanceHeuristic = new JLHScore();
break;
default:
fail();
}
return significanceHeuristic;
}
static IncludeExclude getIncludeExclude() {
IncludeExclude incExc = null;
switch (randomInt(5)) {
case 0:
@ -148,38 +189,7 @@ public class SignificantTermsTests extends BaseAggregationTestCase<SignificantTe
default:
fail();
}
factory.includeExclude(incExc);
}
if (randomBoolean()) {
SignificanceHeuristic significanceHeuristic = null;
switch (randomInt(5)) {
case 0:
significanceHeuristic = new PercentageScore();
break;
case 1:
significanceHeuristic = new ChiSquare(randomBoolean(), randomBoolean());
break;
case 2:
significanceHeuristic = new GND(randomBoolean());
break;
case 3:
significanceHeuristic = new MutualInformation(randomBoolean(), randomBoolean());
break;
case 4:
significanceHeuristic = new ScriptHeuristic(mockScript("foo"));
break;
case 5:
significanceHeuristic = new JLHScore();
break;
default:
fail();
}
factory.significanceHeuristic(significanceHeuristic);
}
if (randomBoolean()) {
factory.backgroundFilter(QueryBuilders.termsQuery("foo", "bar"));
}
return factory;
return incExc;
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
import java.util.Arrays;
public class SignificantTextTests extends BaseAggregationTestCase<SignificantTextAggregationBuilder> {
@Override
protected SignificantTextAggregationBuilder createTestAggregatorBuilder() {
String name = randomAlphaOfLengthBetween(3, 20);
String field = randomAlphaOfLengthBetween(3, 20);
SignificantTextAggregationBuilder factory = new SignificantTextAggregationBuilder(name, field);
if (randomBoolean()) {
factory.bucketCountThresholds().setRequiredSize(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
factory.sourceFieldNames(Arrays.asList(new String []{"foo", "bar"}));
}
if (randomBoolean()) {
factory.bucketCountThresholds().setShardSize(randomIntBetween(1, Integer.MAX_VALUE));
}
if (randomBoolean()) {
int minDocCount = randomInt(4);
switch (minDocCount) {
case 0:
break;
case 1:
case 2:
case 3:
case 4:
minDocCount = randomIntBetween(0, Integer.MAX_VALUE);
break;
}
factory.bucketCountThresholds().setMinDocCount(minDocCount);
}
if (randomBoolean()) {
int shardMinDocCount = randomInt(4);
switch (shardMinDocCount) {
case 0:
break;
case 1:
case 2:
case 3:
case 4:
shardMinDocCount = randomIntBetween(0, Integer.MAX_VALUE);
break;
default:
fail();
}
factory.bucketCountThresholds().setShardMinDocCount(shardMinDocCount);
}
factory.filterDuplicateText(randomBoolean());
if (randomBoolean()) {
IncludeExclude incExc = SignificantTermsTests.getIncludeExclude();
factory.includeExclude(incExc);
}
if (randomBoolean()) {
SignificanceHeuristic significanceHeuristic = SignificantTermsTests.getSignificanceHeuristic();
factory.significanceHeuristic(significanceHeuristic);
}
if (randomBoolean()) {
factory.backgroundFilter(QueryBuilders.termsQuery("foo", "bar"));
}
return factory;
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket.significant;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper.TextFieldType;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.sampler.Sampler;
import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms.Bucket;
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTextAggregationBuilder;
import java.io.IOException;
import java.util.Arrays;
public class SignificantTextAggregatorTests extends AggregatorTestCase {
/**
* Uses the significant text aggregation to find the keywords in text fields
*/
public void testSignificance() throws IOException {
TextFieldType textFieldType = new TextFieldType();
textFieldType.setName("text");
textFieldType.setIndexAnalyzer(new NamedAnalyzer("my_analyzer", AnalyzerScope.GLOBAL, new StandardAnalyzer()));
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
indexWriterConfig.setMaxBufferedDocs(100);
indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a
// single segment with
// predictable docIds
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
for (int i = 0; i < 10; i++) {
Document doc = new Document();
StringBuilder text = new StringBuilder("common ");
if (i % 2 == 0) {
text.append("odd ");
} else {
text.append("even separator" + i + " duplicate duplicate duplicate duplicate duplicate duplicate ");
}
doc.add(new Field("text", text.toString(), textFieldType));
String json ="{ \"text\" : \"" + text.toString() + "\","+
" \"json_only_field\" : \"" + text.toString() + "\"" +
" }";
doc.add(new StoredField("_source", new BytesRef(json)));
w.addDocument(doc);
}
SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text").filterDuplicateText(true);
if(randomBoolean()){
sigAgg.sourceFieldNames(Arrays.asList(new String [] {"json_only_field"}));
}
SamplerAggregationBuilder aggBuilder = new SamplerAggregationBuilder("sampler")
.subAggregation(sigAgg);
try (IndexReader reader = DirectoryReader.open(w)) {
assertEquals("test expects a single segment", 1, reader.leaves().size());
IndexSearcher searcher = new IndexSearcher(reader);
// Search "odd" which should have no duplication
Sampler sampler = searchAndReduce(searcher, new TermQuery(new Term("text", "odd")), aggBuilder, textFieldType);
SignificantTerms terms = sampler.getAggregations().get("sig_text");
assertNull(terms.getBucketByKey("even"));
assertNull(terms.getBucketByKey("duplicate"));
assertNull(terms.getBucketByKey("common"));
assertNotNull(terms.getBucketByKey("odd"));
// Search "even" which will have duplication
sampler = searchAndReduce(searcher, new TermQuery(new Term("text", "even")), aggBuilder, textFieldType);
terms = sampler.getAggregations().get("sig_text");
assertNull(terms.getBucketByKey("odd"));
assertNull(terms.getBucketByKey("duplicate"));
assertNull(terms.getBucketByKey("common"));
assertNull(terms.getBucketByKey("separator2"));
assertNull(terms.getBucketByKey("separator4"));
assertNull(terms.getBucketByKey("separator6"));
assertNotNull(terms.getBucketByKey("even"));
}
}
}
}

View File

@ -338,6 +338,58 @@ for (int i = 0; i < 50; i++) {
}
buildRestTests.setups['stackoverflow'] += """
"""
// Used by significant_text aggregation docs
buildRestTests.setups['news'] = '''
- do:
indices.create:
index: news
body:
settings:
number_of_shards: 1
number_of_replicas: 1
mappings:
question:
properties:
source:
type: keyword
content:
type: text
- do:
bulk:
index: news
type: article
refresh: true
body: |'''
// Make h5n1 strongly connected to bird flu
for (int i = 0; i < 100; i++) {
buildRestTests.setups['news'] += """
{"index":{}}
{"source": "very_relevant_$i", "content": "bird flu h5n1"}"""
}
for (int i = 0; i < 100; i++) {
buildRestTests.setups['news'] += """
{"index":{}}
{"source": "filler_$i", "content": "bird dupFiller "}"""
}
for (int i = 0; i < 100; i++) {
buildRestTests.setups['news'] += """
{"index":{}}
{"source": "filler_$i", "content": "flu dupFiller "}"""
}
for (int i = 0; i < 20; i++) {
buildRestTests.setups['news'] += """
{"index":{}}
{"source": "partially_relevant_$i", "content": "elasticsearch dupFiller dupFiller dupFiller dupFiller pozmantier"}"""
}
for (int i = 0; i < 10; i++) {
buildRestTests.setups['news'] += """
{"index":{}}
{"source": "partially_relevant_$i", "content": "elasticsearch logstash kibana"}"""
}
buildRestTests.setups['news'] += """
"""
// Used by some aggregations
buildRestTests.setups['exams'] = '''

View File

@ -49,5 +49,7 @@ include::bucket/sampler-aggregation.asciidoc[]
include::bucket/significantterms-aggregation.asciidoc[]
include::bucket/significanttext-aggregation.asciidoc[]
include::bucket/terms-aggregation.asciidoc[]

View File

@ -0,0 +1,487 @@
[[search-aggregations-bucket-significanttext-aggregation]]
=== Significant Text Aggregation
experimental[]
An aggregation that returns interesting or unusual occurrences of free-text terms in a set.
It is like the <<search-aggregations-bucket-significantterms-aggregation,significant terms>> aggregation but differs in that:
* It is specifically designed for use on type `text` fields
* It does not require field data or doc-values
* It re-analyzes text content on-the-fly meaning it can also filter duplicate sections of
noisy text that otherwise tend to skew statistics.
WARNING: Re-analyzing _large_ result sets will require a lot of time and memory. It is recommended that the significant_text
aggregation is used as a child of either the <<search-aggregations-bucket-sampler-aggregation,sampler>> or
<<search-aggregations-bucket-diversified-sampler-aggregation,diversified sampler>> aggregation to limit the analysis
to a _small_ selection of top-matching documents e.g. 200. This will typically improve speed, memory use and quality of
results.
.Example use cases:
* Suggesting "H5N1" when users search for "bird flu" to help expand queries
* Suggesting keywords relating to stock symbol $ATI for use in an automated news classifier
In these cases the words being selected are not simply the most popular terms in results. The most popular words tend to be
very boring (_and, of, the, we, I, they_ ...).
The significant words are the ones that have undergone a significant change in popularity measured between a _foreground_ and _background_ set.
If the term "H5N1" only exists in 5 documents in a 10 million document index and yet is found in 4 of the 100 documents that make up a user's search results
that is significant and probably very relevant to their search. 5/10,000,000 vs 4/100 is a big swing in frequency.
experimental[The `significant_text` aggregation is new and may change in non-backwards compatible ways if we add further text-analysis features e.g. phrase detection]
==== Basic use
In the typical use case, the _foreground_ set of interest is a selection of the top-matching search results for a query
and the _background_set used for statistical comparisons is the index or indices from which the results were gathered.
Example:
[source,js]
--------------------------------------------------
GET news/article/_search
{
"query" : {
"match" : {"content" : "Bird flu"}
},
"aggregations" : {
"my_sample" : {
"sampler" : {
"shard_size" : 100
},
"aggregations": {
"keywords" : {
"significant_text" : { "field" : "content" }
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:news]
Response:
[source,js]
--------------------------------------------------
{
"took": 9,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations" : {
"my_sample": {
"doc_count": 100,
"keywords" : {
"doc_count": 100,
"buckets" : [
{
"key": "h5n1",
"doc_count": 4,
"score": 4.71235374214817,
"bg_count": 5
}
...
]
}
}
}
}
--------------------------------------------------
// NOTCONSOLE
The results show that "h5n1" is one of several terms strongly associated with bird flu.
It only occurs 5 times in our index as a whole (see the `bg_count`) and yet 4 of these
were lucky enough to appear in our 100 document sample of "bird flu" results. That suggests
a significant word and one which the user can potentially add to their search.
==== Dealing with noisy data using `filter_duplicate_text`
Free-text fields often contain a mix of original content and mechanical copies of text (cut-and-paste biographies, email reply chains,
retweets, boilerplate headers/footers, page navigation menus, sidebar news links, copyright notices, standard disclaimers, addresses).
In real-world data these duplicate sections of text tend to feature heavily in `significant_text` results if they aren't filtered out.
Filtering near-duplicate text is a difficult task at index-time but we can cleanse the data on-the-fly at query time using the
`filter_duplicate_text` setting.
First let's look at an unfiltered real-world example using the http://research.signalmedia.co/newsir16/signal-dataset.html[Signal media dataset] of
a million news articles covering a wide variety of news. Here are the raw significant text results for a search for the articles
mentioning "elasticsearch":
[source,js]
--------------------------------------------------
{
...
"aggregations": {
"sample": {
"doc_count": 35,
"keywords": {
"doc_count": 35,
"buckets": [
{
"key": "elasticsearch",
"doc_count": 35,
"score": 28570.428571428572,
"bg_count": 35
},
...
{
"key": "currensee",
"doc_count": 8,
"score": 6530.383673469388,
"bg_count": 8
},
...
{
"key": "pozmantier",
"doc_count": 4,
"score": 3265.191836734694,
"bg_count": 4
},
...
}
--------------------------------------------------
// NOTCONSOLE
The uncleansed documents have thrown up some odd-looking terms that are, on the face of it, statistically
correlated with appearances of our search term "elasticsearch" e.g. "pozmantier".
We can drill down into examples of these documents to see why pozmantier is connected using this query:
[source,js]
--------------------------------------------------
GET news/article/_search
{
"query": {
"simple_query_string": {
"query": "+elasticsearch +pozmantier"
}
},
"_source": [
"title",
"source"
],
"highlight": {
"fields": {
"content": {}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:news]
The results show a series of very similar news articles about a judging panel for a number of tech projects:
[source,js]
--------------------------------------------------
{
...
"hits": {
"hits": [
{
...
"_source": {
"source": "Presentation Master",
"title": "T.E.N. Announces Nominees for the 2015 ISE® North America Awards"
},
"highlight": {
"content": [
"City of San Diego Mike <em>Pozmantier</em>, Program Manager, Cyber Security Division, Department of",
" Janus, Janus <em>ElasticSearch</em> Security Visualization Engine "
]
}
},
{
...
"_source": {
"source": "RCL Advisors",
"title": "T.E.N. Announces Nominees for the 2015 ISE(R) North America Awards"
},
"highlight": {
"content": [
"Mike <em>Pozmantier</em>, Program Manager, Cyber Security Division, Department of Homeland Security S&T",
"Janus, Janus <em>ElasticSearch</em> Security Visualization Engine"
]
}
},
...
--------------------------------------------------
// NOTCONSOLE
Mike Pozmantier was one of many judges on a panel and elasticsearch was used in one of many projects being judged.
As is typical, this lengthy press release was cut-and-paste by a variety of news sites and consequently any rare names, numbers or
typos they contain become statistically correlated with our matching query.
Fortunately similar documents tend to rank similarly so as part of examining the stream of top-matching documents the significant_text
aggregation can apply a filter to remove sequences of any 6 or more tokens that have already been seen. Let's try this same query now but
with the `filter_duplicate_text` setting turned on:
[source,js]
--------------------------------------------------
GET news/article/_search
{
"query": {
"match": {
"content": "elasticsearch"
}
},
"aggs": {
"sample": {
"sampler": {
"shard_size": 100
},
"aggs": {
"keywords": {
"significant_text": {
"field": "content",
"filter_duplicate_text": true
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:news]
The results from analysing our deduplicated text are obviously of higher quality to anyone familiar with the elastic stack:
[source,js]
--------------------------------------------------
{
...
"aggregations": {
"sample": {
"doc_count": 35,
"keywords": {
"doc_count": 35,
"buckets": [
{
"key": "elasticsearch",
"doc_count": 22,
"score": 11288.001166180758,
"bg_count": 35
},
{
"key": "logstash",
"doc_count": 3,
"score": 1836.648979591837,
"bg_count": 4
},
{
"key": "kibana",
"doc_count": 3,
"score": 1469.3020408163263,
"bg_count": 5
}
]
}
}
}
}
--------------------------------------------------
// NOTCONSOLE
Mr Pozmantier and other one-off associations with elasticsearch no longer appear in the aggregation
results as a consequence of copy-and-paste operations or other forms of mechanical repetition.
If your duplicate or near-duplicate content is identifiable via a single-value indexed field (perhaps
a hash of the article's `title` text or an `original_press_release_url` field) then it would be more
efficient to use a parent <<search-aggregations-bucket-diversified-sampler-aggregation,diversified sampler>> aggregation
to eliminate these documents from the sample set based on that single key. The less duplicate content you can feed into
the significant_text aggregation up front the better in terms of performance.
.How are the significance scores calculated?
**********************************
The numbers returned for scores are primarily intended for ranking different suggestions sensibly rather than something easily
understood by end users. The scores are derived from the doc frequencies in _foreground_ and _background_ sets. In brief, a
term is considered significant if there is a noticeable difference in the frequency in which a term appears in the subset and
in the background. The way the terms are ranked can be configured, see "Parameters" section.
**********************************
.Use the _"like this but not this"_ pattern
**********************************
You can spot mis-categorized content by first searching a structured field e.g. `category:adultMovie` and use significant_text on the
text "movie_description" field. Take the suggested words (I'll leave them to your imagination) and then search for all movies NOT marked as category:adultMovie but containing these keywords.
You now have a ranked list of badly-categorized movies that you should reclassify or at least remove from the "familyFriendly" category.
The significance score from each term can also provide a useful `boost` setting to sort matches.
Using the `minimum_should_match` setting of the `terms` query with the keywords will help control the balance of precision/recall in the result set i.e
a high setting would have a small number of relevant results packed full of keywords and a setting of "1" would produce a more exhaustive results set with all documents containing _any_ keyword.
**********************************
==== Limitations
===== No support for child aggregations
The significant_text aggregation intentionally does not support the addition of child aggregations because:
* It would come with a high memory cost
* It isn't a generally useful feature and there is a workaround for those that need it
The volume of candidate terms is generally very high and these are pruned heavily before the final
results are returned. Supporting child aggregations would generate additional churn and be inefficient.
Clients can always take the heavily-trimmed set of results from a `significant_text` request and
make a subsequent follow-up query using a `terms` aggregation with an `include` clause and child
aggregations to perform further analysis of selected keywords in a more efficient fashion.
===== Approximate counts
The counts of how many documents contain a term provided in results are based on summing the samples returned from each shard and
as such may be:
* low if certain shards did not provide figures for a given term in their top sample
* high when considering the background frequency as it may count occurrences found in deleted documents
Like most design decisions, this is the basis of a trade-off in which we have chosen to provide fast performance at the cost of some (typically small) inaccuracies.
However, the `size` and `shard size` settings covered in the next section provide tools to help control the accuracy levels.
==== Parameters
===== Significance heuristics
This aggregation supports the same scoring heuristics (JLH, mutual_information, gnd, chi_square etc) as the <<search-aggregations-bucket-significantterms-aggregation,significant terms>> aggregation
===== Size & Shard Size
The `size` parameter can be set to define how many term buckets should be returned out of the overall terms list. By
default, the node coordinating the search process will request each shard to provide its own top term buckets
and once all shards respond, it will reduce the results to the final list that will then be returned to the client.
If the number of unique terms is greater than `size`, the returned list can be slightly off and not accurate
(it could be that the term counts are slightly off and it could even be that a term that should have been in the top
size buckets was not returned).
To ensure better accuracy a multiple of the final `size` is used as the number of terms to request from each shard
using a heuristic based on the number of shards. To take manual control of this setting the `shard_size` parameter
can be used to control the volumes of candidate terms produced by each shard.
Low-frequency terms can turn out to be the most interesting ones once all results are combined so the
significant_terms aggregation can produce higher-quality results when the `shard_size` parameter is set to
values significantly higher than the `size` setting. This ensures that a bigger volume of promising candidate terms are given
a consolidated review by the reducing node before the final selection. Obviously large candidate term lists
will cause extra network traffic and RAM usage so this is quality/cost trade off that needs to be balanced. If `shard_size` is set to -1 (the default) then `shard_size` will be automatically estimated based on the number of shards and the `size` parameter.
NOTE: `shard_size` cannot be smaller than `size` (as it doesn't make much sense). When it is, elasticsearch will
override it and reset it to be equal to `size`.
===== Minimum document count
It is possible to only return terms that match more than a configured number of hits using the `min_doc_count` option.
The Default value is 3.
Terms that score highly will be collected on a shard level and merged with the terms collected from other shards in a second step.
However, the shard does not have the information about the global term frequencies available. The decision if a term is added to a
candidate list depends only on the score computed on the shard using local shard frequencies, not the global frequencies of the word.
The `min_doc_count` criterion is only applied after merging local terms statistics of all shards. In a way the decision to add the
term as a candidate is made without being very _certain_ about if the term will actually reach the required `min_doc_count`.
This might cause many (globally) high frequent terms to be missing in the final result if low frequent but high scoring terms populated
the candidate lists. To avoid this, the `shard_size` parameter can be increased to allow more candidate terms on the shards.
However, this increases memory consumption and network traffic.
`shard_min_doc_count` parameter
The parameter `shard_min_doc_count` regulates the _certainty_ a shard has if the term should actually be added to the candidate list or
not with respect to the `min_doc_count`. Terms will only be considered if their local shard frequency within the set is higher than the
`shard_min_doc_count`. If your dictionary contains many low frequent words and you are not interested in these (for example misspellings),
then you can set the `shard_min_doc_count` parameter to filter out candidate terms on a shard level that will with a reasonable certainty
not reach the required `min_doc_count` even after merging the local frequencies. `shard_min_doc_count` is set to `1` per default and has
no effect unless you explicitly set it.
WARNING: Setting `min_doc_count` to `1` is generally not advised as it tends to return terms that
are typos or other bizarre curiosities. Finding more than one instance of a term helps
reinforce that, while still rare, the term was not the result of a one-off accident. The
default value of 3 is used to provide a minimum weight-of-evidence.
Setting `shard_min_doc_count` too high will cause significant candidate terms to be filtered out on a shard level.
This value should be set much lower than `min_doc_count/#shards`.
===== Custom background context
The default source of statistical information for background term frequencies is the entire index and this
scope can be narrowed through the use of a `background_filter` to focus in on significant terms within a narrower
context:
[source,js]
--------------------------------------------------
GET news/article/_search
{
"query" : {
"match" : {
"content" : "madrid"
}
},
"aggs" : {
"tags" : {
"significant_text" : {
"field" : "content",
"background_filter": {
"term" : { "content" : "spain"}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:news]
The above filter would help focus in on terms that were peculiar to the city of Madrid rather than revealing
terms like "Spanish" that are unusual in the full index's worldwide context but commonplace in the subset of documents containing the
word "Spain".
WARNING: Use of background filters will slow the query as each term's postings must be filtered to determine a frequency
===== Dealing with source and index mappings
Ordinarily the indexed field name and the original JSON field being retrieved share the same name.
However with more complex field mappings using features like `copy_to` the source
JSON field(s) and the indexed field being aggregated can differ.
In these cases it is possible to list the JSON _source fields from which text
will be analyzed using the `source_fields` parameter:
[source,js]
--------------------------------------------------
GET news/article/_search
{
"query" : {
"match" : {
"custom_all" : "elasticsearch"
}
},
"aggs" : {
"tags" : {
"significant_text" : {
"field" : "custom_all",
"source_fields": ["content" , "title"]
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:news]
===== Filtering Values
It is possible (although rarely required) to filter the values for which buckets will be created. This can be done using the `include` and
`exclude` parameters which are based on a regular expression string or arrays of exact terms. This functionality mirrors the features
described in the <<search-aggregations-bucket-terms-aggregation,terms aggregation>> documentation.

View File

@ -7,7 +7,7 @@ description of a product. These fields are `analyzed`, that is they are passed t
before being indexed. The analysis process allows Elasticsearch to search for
individual words _within_ each full text field. Text fields are not
used for sorting and seldom used for aggregations (although the
<<search-aggregations-bucket-significantterms-aggregation,significant terms aggregation>>
<<search-aggregations-bucket-significanttext-aggregation,significant text aggregation>>
is a notable exception).
If you need to index structured content such as email addresses, hostnames, status

View File

@ -0,0 +1,166 @@
---
"Default index":
- do:
indices.create:
index: goodbad
body:
settings:
number_of_shards: "1"
mappings:
doc:
properties:
text:
type: text
fielddata: false
class:
type: keyword
- do:
index:
index: goodbad
type: doc
id: 1
body: { text: "good", class: "good" }
- do:
index:
index: goodbad
type: doc
id: 2
body: { text: "good", class: "good" }
- do:
index:
index: goodbad
type: doc
id: 3
body: { text: "bad", class: "bad" }
- do:
index:
index: goodbad
type: doc
id: 4
body: { text: "bad", class: "bad" }
- do:
index:
index: goodbad
type: doc
id: 5
body: { text: "good bad", class: "good" }
- do:
index:
index: goodbad
type: doc
id: 6
body: { text: "good bad", class: "bad" }
- do:
index:
index: goodbad
type: doc
id: 7
body: { text: "bad", class: "bad" }
- do:
indices.refresh:
index: [goodbad]
- do:
search:
index: goodbad
type: doc
- match: {hits.total: 7}
- do:
search:
index: goodbad
type: doc
body: {"aggs": {"class": {"terms": {"field": "class"},"aggs": {"sig_text": {"significant_text": {"field": "text"}}}}}}
- match: {aggregations.class.buckets.0.sig_text.buckets.0.key: "bad"}
- match: {aggregations.class.buckets.1.sig_text.buckets.0.key: "good"}
---
"Dedup noise":
- do:
indices.create:
index: goodbad
body:
settings:
number_of_shards: "1"
mappings:
doc:
properties:
text:
type: text
fielddata: false
class:
type: keyword
- do:
index:
index: goodbad
type: doc
id: 1
body: { text: "good noisewords1 g1 g2 g3 g4 g5 g6", class: "good" }
- do:
index:
index: goodbad
type: doc
id: 2
body: { text: "good noisewords2 g1 g2 g3 g4 g5 g6", class: "good" }
- do:
index:
index: goodbad
type: doc
id: 3
body: { text: "bad noisewords3 b1 b2 b3 b4 b5 b6", class: "bad" }
- do:
index:
index: goodbad
type: doc
id: 4
body: { text: "bad noisewords4 b1 b2 b3 b4 b5 b6", class: "bad" }
- do:
index:
index: goodbad
type: doc
id: 5
body: { text: "good bad noisewords5 gb1 gb2 gb3 gb4 gb5 gb6", class: "good" }
- do:
index:
index: goodbad
type: doc
id: 6
body: { text: "good bad noisewords6 gb1 gb2 gb3 gb4 gb5 gb6", class: "bad" }
- do:
index:
index: goodbad
type: doc
id: 7
body: { text: "bad noisewords7 b1 b2 b3 b4 b5 b6", class: "bad" }
- do:
indices.refresh:
index: [goodbad]
- do:
search:
index: goodbad
type: doc
- match: {hits.total: 7}
- do:
search:
index: goodbad
type: doc
body: {"aggs": {"class": {"terms": {"field": "class"},"aggs": {"sig_text": {"significant_text": {"field": "text", "filter_duplicate_text": true}}}}}}
- match: {aggregations.class.buckets.0.sig_text.buckets.0.key: "bad"}
- length: { aggregations.class.buckets.0.sig_text.buckets: 1 }
- match: {aggregations.class.buckets.1.sig_text.buckets.0.key: "good"}
- length: { aggregations.class.buckets.1.sig_text.buckets: 1 }