Aggregations bug: Significant_text fails on arrays of text. (#25030)

* Aggregations bug: Significant_text fails on arrays of text.
The set of previously-seen tokens in a doc was allocated per-JSON-field string value rather than once per JSON document meaning the number of docs containing a term could be over-counted leading to exceptions from the checks in significance heuristics. Added unit test for this scenario

Closes #25029
This commit is contained in:
markharwood 2017-06-12 14:02:54 +01:00 committed by GitHub
parent 7ab3d5d04a
commit 518cda6637
2 changed files with 76 additions and 44 deletions

View File

@ -113,45 +113,40 @@ public class SignificantTextAggregator extends BucketsAggregator {
}
}
private void processTokenStream(int doc, long bucket, TokenStream ts, String fieldText) throws IOException{
private void processTokenStream(int doc, long bucket, TokenStream ts, BytesRefHash inDocTerms, String fieldText)
throws IOException{
if (dupSequenceSpotter != null) {
ts = new DeDuplicatingTokenFilter(ts, dupSequenceSpotter);
}
CharTermAttribute termAtt = ts.addAttribute(CharTermAttribute.class);
ts.reset();
try {
//Assume tokens will average 5 bytes in length to size number of tokens
BytesRefHash inDocTerms = new BytesRefHash(1+(fieldText.length()/5), context.bigArrays());
try{
while (ts.incrementToken()) {
if (dupSequenceSpotter != null) {
long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
long growth = newTrieSize - lastTrieSize;
// Only update the circuitbreaker after
if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
addRequestCircuitBreakerBytes(growth);
lastTrieSize = newTrieSize;
}
while (ts.incrementToken()) {
if (dupSequenceSpotter != null) {
long newTrieSize = dupSequenceSpotter.getEstimatedSizeInBytes();
long growth = newTrieSize - lastTrieSize;
// Only update the circuitbreaker after
if (growth > MEMORY_GROWTH_REPORTING_INTERVAL_BYTES) {
addRequestCircuitBreakerBytes(growth);
lastTrieSize = newTrieSize;
}
previous.clear();
previous.copyChars(termAtt);
BytesRef bytes = previous.get();
if (inDocTerms.add(bytes) >= 0) {
if (includeExclude == null || includeExclude.accept(bytes)) {
long bucketOrdinal = bucketOrds.add(bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}
previous.clear();
previous.copyChars(termAtt);
BytesRef bytes = previous.get();
if (inDocTerms.add(bytes) >= 0) {
if (includeExclude == null || includeExclude.accept(bytes)) {
long bucketOrdinal = bucketOrds.add(bytes);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
} else {
collectBucket(sub, doc, bucketOrdinal);
}
}
}
} finally{
Releasables.close(inDocTerms);
}
} finally{
ts.close();
}
@ -166,23 +161,28 @@ public class SignificantTextAggregator extends BucketsAggregator {
SourceLookup sourceLookup = context.lookup().source();
sourceLookup.setSegmentAndDocument(ctx, doc);
BytesRefHash inDocTerms = new BytesRefHash(256, context.bigArrays());
for (String sourceField : sourceFieldNames) {
List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);
textsToHighlight = textsToHighlight.stream().map(obj -> {
if (obj instanceof BytesRef) {
return fieldType.valueForDisplay(obj).toString();
} else {
return obj;
}
}).collect(Collectors.toList());
Analyzer analyzer = fieldType.indexAnalyzer();
for (Object fieldValue : textsToHighlight) {
String fieldText = fieldValue.toString();
TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
processTokenStream(doc, bucket, ts, fieldText);
}
try {
for (String sourceField : sourceFieldNames) {
List<Object> textsToHighlight = sourceLookup.extractRawValues(sourceField);
textsToHighlight = textsToHighlight.stream().map(obj -> {
if (obj instanceof BytesRef) {
return fieldType.valueForDisplay(obj).toString();
} else {
return obj;
}
}).collect(Collectors.toList());
Analyzer analyzer = fieldType.indexAnalyzer();
for (Object fieldValue : textsToHighlight) {
String fieldText = fieldValue.toString();
TokenStream ts = analyzer.tokenStream(indexedFieldName, fieldText);
processTokenStream(doc, bucket, ts, inDocTerms, fieldText);
}
}
} finally{
Releasables.close(inDocTerms);
}
}
};

View File

@ -123,4 +123,36 @@ public class SignificantTextAggregatorTests extends AggregatorTestCase {
}
}
}
/**
* Test documents with arrays of text
*/
public void testSignificanceOnTextArrays() throws IOException {
TextFieldType textFieldType = new TextFieldType();
textFieldType.setName("text");
textFieldType.setIndexAnalyzer(new NamedAnalyzer("my_analyzer", AnalyzerScope.GLOBAL, new StandardAnalyzer()));
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
for (int i = 0; i < 10; i++) {
Document doc = new Document();
doc.add(new Field("text", "foo", textFieldType));
String json ="{ \"text\" : [\"foo\",\"foo\"], \"title\" : [\"foo\", \"foo\"]}";
doc.add(new StoredField("_source", new BytesRef(json)));
w.addDocument(doc);
}
SignificantTextAggregationBuilder sigAgg = new SignificantTextAggregationBuilder("sig_text", "text");
sigAgg.sourceFieldNames(Arrays.asList(new String [] {"title", "text"}));
try (IndexReader reader = DirectoryReader.open(w)) {
assertEquals("test expects a single segment", 1, reader.leaves().size());
IndexSearcher searcher = new IndexSearcher(reader);
searchAndReduce(searcher, new TermQuery(new Term("text", "foo")), sigAgg, textFieldType);
// No significant results to be found in this test - only checking we don't end up
// with the internal exception discovered in issue https://github.com/elastic/elasticsearch/issues/25029
}
}
}
}