aggs: fix significant terms reduce for long terms

Significant terms were not reduced correctly if they were long terms.
Also, clean up the bwc test a little. Upgrades are not needed.

related to #13522
This commit is contained in:
Britta Weber 2015-11-23 18:28:55 +01:00
parent 866bed0864
commit 556d80501c
3 changed files with 68 additions and 26 deletions

View File

@ -173,7 +173,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation; InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
for (Bucket bucket : terms.buckets) { for (Bucket bucket : terms.buckets) {
List<Bucket> existingBuckets = buckets.get(bucket.getKey()); List<Bucket> existingBuckets = buckets.get(bucket.getKeyAsString());
if (existingBuckets == null) { if (existingBuckets == null) {
existingBuckets = new ArrayList<>(aggregations.size()); existingBuckets = new ArrayList<>(aggregations.size());
buckets.put(bucket.getKeyAsString(), existingBuckets); buckets.put(bucket.getKeyAsString(), existingBuckets);

View File

@ -48,24 +48,16 @@ public class SignificantTermsBackwardCompatibilityIT extends ESBackcompatTestCas
static final String CLASS_FIELD = "class"; static final String CLASS_FIELD = "class";
/** /**
* Simple upgrade test for streaming significant terms buckets * Test for streaming significant terms buckets to old es versions.
*/ */
public void testBucketStreaming() throws IOException, ExecutionException, InterruptedException { public void testBucketStreaming() throws IOException, ExecutionException, InterruptedException {
logger.debug("testBucketStreaming: indexing documents"); logger.debug("testBucketStreaming: indexing documents");
String type = randomBoolean() ? "string" : "long"; String type = randomBoolean() ? "string" : "long";
String settings = "{\"index.number_of_shards\": 5, \"index.number_of_replicas\": 0}"; String settings = "{\"index.number_of_shards\": 5, \"index.number_of_replicas\": 0}";
index01Docs(type, settings); index01Docs(type, settings);
ensureGreen();
logClusterState(); logClusterState();
boolean upgraded; checkSignificantTermsAggregationCorrect();
int upgradedNodesCounter = 1;
do {
logger.debug("testBucketStreaming: upgrading {}st node", upgradedNodesCounter++);
upgraded = backwardsCluster().upgradeOneNode();
ensureGreen();
logClusterState();
checkSignificantTermsAggregationCorrect();
} while (upgraded);
logger.debug("testBucketStreaming: done testing significant terms while upgrading"); logger.debug("testBucketStreaming: done testing significant terms while upgrading");
} }
@ -101,7 +93,7 @@ public class SignificantTermsBackwardCompatibilityIT extends ESBackcompatTestCas
.execute() .execute()
.actionGet(); .actionGet();
assertSearchResponse(response); assertSearchResponse(response);
StringTerms classes = (StringTerms) response.getAggregations().get("class"); StringTerms classes = response.getAggregations().get("class");
assertThat(classes.getBuckets().size(), equalTo(2)); assertThat(classes.getBuckets().size(), equalTo(2));
for (Terms.Bucket classBucket : classes.getBuckets()) { for (Terms.Bucket classBucket : classes.getBuckets()) {
Map<String, Aggregation> aggs = classBucket.getAggregations().asMap(); Map<String, Aggregation> aggs = classBucket.getAggregations().asMap();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.ChiSquare; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.ChiSquare;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.GND; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.GND;
@ -38,6 +39,8 @@ import org.elasticsearch.search.aggregations.bucket.significant.heuristics.Signi
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicBuilder; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicBuilder;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParserMapper; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParserMapper;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.test.TestSearchContext;
@ -45,18 +48,11 @@ import org.elasticsearch.test.TestSearchContext;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/** /**
* *
@ -83,24 +79,28 @@ public class SignificanceHeuristicTests extends ESTestCase {
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(version); out.setVersion(version);
sigTerms[0].writeTo(out); sigTerms[0].writeTo(out);
// read // read
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(version); in.setVersion(version);
sigTerms[1].readFrom(in); sigTerms[1].readFrom(in);
assertTrue(sigTerms[1].significanceHeuristic.equals(sigTerms[0].significanceHeuristic)); assertTrue(sigTerms[1].significanceHeuristic.equals(sigTerms[0].significanceHeuristic));
InternalSignificantTerms.Bucket originalBucket = (InternalSignificantTerms.Bucket) sigTerms[0].buckets.get(0);
InternalSignificantTerms.Bucket streamedBucket = (InternalSignificantTerms.Bucket) sigTerms[1].buckets.get(0);
assertThat(originalBucket.getKeyAsString(), equalTo(streamedBucket.getKeyAsString()));
assertThat(originalBucket.getSupersetDf(), equalTo(streamedBucket.getSupersetDf()));
assertThat(originalBucket.getSubsetDf(), equalTo(streamedBucket.getSubsetDf()));
assertThat(streamedBucket.getSubsetSize(), equalTo(10l));
assertThat(streamedBucket.getSupersetSize(), equalTo(20l));
} }
InternalSignificantTerms[] getRandomSignificantTerms(SignificanceHeuristic heuristic) { InternalSignificantTerms[] getRandomSignificantTerms(SignificanceHeuristic heuristic) {
InternalSignificantTerms[] sTerms = new InternalSignificantTerms[2]; InternalSignificantTerms[] sTerms = new InternalSignificantTerms[2];
ArrayList<InternalSignificantTerms.Bucket> buckets = new ArrayList<>(); ArrayList<InternalSignificantTerms.Bucket> buckets = new ArrayList<>();
if (randomBoolean()) { if (randomBoolean()) {
BytesRef term = new BytesRef("123.0");
buckets.add(new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY, null)); buckets.add(new SignificantLongTerms.Bucket(1, 2, 3, 4, 123, InternalAggregations.EMPTY, null));
sTerms[0] = new SignificantLongTerms(10, 20, "some_name", null, 1, 1, heuristic, buckets, sTerms[0] = new SignificantLongTerms(10, 20, "some_name", null, 1, 1, heuristic, buckets,
Collections.EMPTY_LIST, null); Collections.EMPTY_LIST, null);
@ -125,6 +125,56 @@ public class SignificanceHeuristicTests extends ESTestCase {
return heuristics.get(randomInt(3)); return heuristics.get(randomInt(3));
} }
public void testReduce() {
List<InternalAggregation> aggs = createInternalAggregations();
SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).doReduce(aggs, null);
assertThat(reducedAgg.getBuckets().size(), equalTo(2));
assertThat(reducedAgg.getBuckets().get(0).getSubsetDf(), equalTo(8l));
assertThat(reducedAgg.getBuckets().get(0).getSubsetSize(), equalTo(16l));
assertThat(reducedAgg.getBuckets().get(0).getSupersetDf(), equalTo(10l));
assertThat(reducedAgg.getBuckets().get(0).getSupersetSize(), equalTo(30l));
assertThat(reducedAgg.getBuckets().get(1).getSubsetDf(), equalTo(8l));
assertThat(reducedAgg.getBuckets().get(1).getSubsetSize(), equalTo(16l));
assertThat(reducedAgg.getBuckets().get(1).getSupersetDf(), equalTo(10l));
assertThat(reducedAgg.getBuckets().get(1).getSupersetSize(), equalTo(30l));
}
// Create aggregations as they might come from three different shards and return as list.
private List<InternalAggregation> createInternalAggregations() {
String type = randomBoolean() ? "long" : "string";
SignificanceHeuristic significanceHeuristic = getRandomSignificanceheuristic();
List<InternalAggregation> aggs = new ArrayList<>();
List<InternalSignificantTerms.Bucket> terms0Buckets = new ArrayList<>();
terms0Buckets.add(createBucket(type, 4, 4, 5, 10, 0));
aggs.add(createAggregation(type, significanceHeuristic, terms0Buckets, 4, 10));
List<InternalSignificantTerms.Bucket> terms1Buckets = new ArrayList<>();
terms0Buckets.add(createBucket(type, 4, 4, 5, 10, 1));
aggs.add(createAggregation(type, significanceHeuristic, terms1Buckets, 4, 10));
List<InternalSignificantTerms.Bucket> terms01Buckets = new ArrayList<>();
terms0Buckets.add(createBucket(type, 4, 8, 5, 10, 0));
terms0Buckets.add(createBucket(type, 4, 8, 5, 10, 1));
aggs.add(createAggregation(type, significanceHeuristic, terms01Buckets, 8, 10));
return aggs;
}
private InternalSignificantTerms createAggregation(String type, SignificanceHeuristic significanceHeuristic, List<InternalSignificantTerms.Bucket> buckets, long subsetSize, long supersetSize) {
if (type.equals("string")) {
return new SignificantStringTerms(subsetSize, supersetSize, "sig_terms", 2, -1, significanceHeuristic, buckets, new ArrayList<PipelineAggregator>(), new HashMap<String, Object>());
} else {
return new SignificantLongTerms(subsetSize, supersetSize, "sig_terms", ValueFormatter.RAW, 2, -1, significanceHeuristic, buckets, new ArrayList<PipelineAggregator>(), new HashMap<String, Object>());
}
}
private InternalSignificantTerms.Bucket createBucket(String type, long subsetDF, long subsetSize, long supersetDF, long supersetSize, long label) {
if (type.equals("string")) {
return new SignificantStringTerms.Bucket(new BytesRef(Long.toString(label).getBytes(StandardCharsets.UTF_8)), subsetDF, subsetSize, supersetDF, supersetSize, InternalAggregations.EMPTY);
} else {
return new SignificantLongTerms.Bucket(subsetDF, subsetSize, supersetDF, supersetSize, label, InternalAggregations.EMPTY, ValueFormatter.RAW);
}
}
// test that // test that
// 1. The output of the builders can actually be parsed // 1. The output of the builders can actually be parsed
// 2. The parser does not swallow parameters after a significance heuristic was defined // 2. The parser does not swallow parameters after a significance heuristic was defined