From f3c6108b717bb4d9a61139ce142d109e304cbab2 Mon Sep 17 00:00:00 2001 From: uboness Date: Mon, 30 Sep 2013 02:14:34 +0200 Subject: [PATCH] introduced support for "shard_size" for terms & terms_stats facets. The "shard_size" is the number of term entries each shard will send back to the coordinating node. "shard_size" > "size" will increase the accuracy (both in terms of the counts associated with each term and the terms that will actually be returned the user) - of course, the higher "shard_size" is, the more expensive the processing becomes as bigger queues are maintained on a shard level and larger lists are streamed back from the shards. closes #3821 --- .../search/facets/terms-facet.asciidoc | 30 + .../search/facets/terms-stats-facet.asciidoc | 8 + .../search/facet/terms/TermsFacetBuilder.java | 12 + .../search/facet/terms/TermsFacetParser.java | 24 +- .../doubles/InternalDoubleTermsFacet.java | 27 +- .../doubles/TermsDoubleFacetExecutor.java | 8 +- .../terms/index/IndexNameFacetExecutor.java | 4 +- .../terms/longs/InternalLongTermsFacet.java | 27 +- .../terms/longs/TermsLongFacetExecutor.java | 8 +- .../FieldsTermsStringFacetExecutor.java | 8 +- .../facet/terms/strings/HashedAggregator.java | 12 +- .../strings/InternalStringTermsFacet.java | 23 +- .../ScriptTermsStringFieldFacetExecutor.java | 10 +- .../strings/TermsStringFacetExecutor.java | 6 +- .../TermsStringOrdinalsFacetExecutor.java | 10 +- .../termsstats/TermsStatsFacetBuilder.java | 14 + .../termsstats/TermsStatsFacetParser.java | 13 +- .../InternalTermsStatsDoubleFacet.java | 22 +- .../TermsStatsDoubleFacetExecutor.java | 6 +- .../longs/InternalTermsStatsLongFacet.java | 22 +- .../longs/TermsStatsLongFacetExecutor.java | 6 +- .../InternalTermsStatsStringFacet.java | 26 +- .../TermsStatsStringFacetExecutor.java | 8 +- .../facet/terms/ShardSizeTermsFacetTests.java | 423 ++++++++++++++ .../ShardSizeTermsStatsFacetTests.java | 548 ++++++++++++++++++ 25 files changed, 1252 insertions(+), 53 deletions(-) create mode 100644 src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java create mode 100644 src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java diff --git a/docs/reference/search/facets/terms-facet.asciidoc b/docs/reference/search/facets/terms-facet.asciidoc index 47d9d03c8d1..241a1556823 100644 --- a/docs/reference/search/facets/terms-facet.asciidoc +++ b/docs/reference/search/facets/terms-facet.asciidoc @@ -24,6 +24,36 @@ example: It is preferred to have the terms facet executed on a non analyzed field, or a field without a large number of terms it breaks to. +==== Accuracy Control + +added[0.90.6] + +The `size` parameter defines how many top terms should be returned out +of the overall terms list. By default, the node coordinating the +search process will ask each shard to provide its own top `size` terms +and once all shards respond, it will reduces the results to the final list +that will then be sent back to the client. This means that if the number +of unique terms is greater than `size`, the returned list is slightly off +and not accurate (it could be that the term counts are slightly off and it +could even be that a term that should have been in the top `size` entries +was not returned). + +The higher the requested `size` is, the more accurate the results will be, +but also, the more expensive it will be to compute the final results (both +due to bigger priority queues that are managed on a shard level and due to +bigger data transfers between the nodes and the client). In an attempt to +minimize the extra work that comes with bigger requested `size` we a +`shard_size` parameter was introduced. The once defined, it will determine +how many terms the coordinating node is requesting from each shard. Once +all the shards responded, the coordinating node will then reduce them +to a final result which will be based on the `size` parameter - this way, +once can increase the accuracy of the returned terms and avoid the overhead +of streaming a big list of terms back to the client. + +Note that `shard_size` cannot be smaller than `size`... if that's the case +elasticsearch will override it and reset it to be equal to `size`. + + ==== Ordering Allow to control the ordering of the terms facets, to be ordered by diff --git a/docs/reference/search/facets/terms-stats-facet.asciidoc b/docs/reference/search/facets/terms-stats-facet.asciidoc index 74f0cc23cd9..cd4875789f5 100644 --- a/docs/reference/search/facets/terms-stats-facet.asciidoc +++ b/docs/reference/search/facets/terms-stats-facet.asciidoc @@ -28,6 +28,14 @@ The `size` parameter controls how many facet entries will be returned. It defaults to `10`. Setting it to 0 will return all terms matching the hits (be careful not to return too many results). +One can also set `shard_size` (in addition to `size`) which will determine +how many term entries will be requested from each shard. When dealing +with field with high cardinality (at least higher than the requested `size`) +The greater `shard_size` is - the more accurate the result will be (and the +more expensive the overall facet computation will be). `shard_size` is there +to enable you to increase accuracy yet still avoid returning too many +terms_stats entries back to the client. + Ordering is done by setting `order`, with possible values of `term`, `reverse_term`, `count`, `reverse_count`, `total`, `reverse_total`, `min`, `reverse_min`, `max`, `reverse_max`, `mean`, `reverse_mean`. diff --git a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java index 47f1daad655..cebe35ae801 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetBuilder.java @@ -37,6 +37,7 @@ public class TermsFacetBuilder extends FacetBuilder { private String fieldName; private String[] fieldsNames; private int size = 10; + private int shardSize = -1; private Boolean allTerms; private Object[] exclude; private String regex; @@ -124,6 +125,11 @@ public class TermsFacetBuilder extends FacetBuilder { return this; } + public TermsFacetBuilder shardSize(int shardSize) { + this.shardSize = shardSize; + return this; + } + /** * A regular expression to use in order to further filter terms. */ @@ -213,6 +219,12 @@ public class TermsFacetBuilder extends FacetBuilder { builder.field("field", fieldName); } builder.field("size", size); + + // no point in sending shard size if it's not greater than size + if (shardSize > size) { + builder.field("shard_size", shardSize); + } + if (exclude != null) { builder.startArray("exclude"); for (Object ex : exclude) { diff --git a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java index b8471ac7a29..ddc0877f82a 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/TermsFacetParser.java @@ -82,6 +82,7 @@ public class TermsFacetParser extends AbstractComponent implements FacetParser { public FacetExecutor parse(String facetName, XContentParser parser, SearchContext context) throws IOException { String field = null; int size = 10; + int shardSize = -1; String[] fieldsNames = null; ImmutableSet excluded = ImmutableSet.of(); @@ -124,6 +125,8 @@ public class TermsFacetParser extends AbstractComponent implements FacetParser { script = parser.text(); } else if ("size".equals(currentFieldName)) { size = parser.intValue(); + } else if ("shard_size".equals(currentFieldName)) { + shardSize = parser.intValue(); } else if ("all_terms".equals(currentFieldName) || "allTerms".equals(currentFieldName)) { allTerms = parser.booleanValue(); } else if ("regex".equals(currentFieldName)) { @@ -143,7 +146,7 @@ public class TermsFacetParser extends AbstractComponent implements FacetParser { } if ("_index".equals(field)) { - return new IndexNameFacetExecutor(context.shardTarget().index(), comparatorType, size); + return new IndexNameFacetExecutor(context.shardTarget().index(), comparatorType, size, shardSize); } if (fieldsNames != null && fieldsNames.length == 1) { @@ -161,6 +164,11 @@ public class TermsFacetParser extends AbstractComponent implements FacetParser { searchScript = context.scriptService().search(context.lookup(), scriptLang, script, params); } + // shard_size cannot be smaller than size as we need to at least fetch entries from every shards in order to return + if (shardSize < size) { + shardSize = size; + } + if (fieldsNames != null) { // in case of multi files, we only collect the fields that are mapped and facet on them. @@ -175,10 +183,10 @@ public class TermsFacetParser extends AbstractComponent implements FacetParser { // non of the fields is mapped return new UnmappedFieldExecutor(size, comparatorType); } - return new FieldsTermsStringFacetExecutor(facetName, mappers.toArray(new FieldMapper[mappers.size()]), size, comparatorType, allTerms, context, excluded, pattern, searchScript); + return new FieldsTermsStringFacetExecutor(mappers.toArray(new FieldMapper[mappers.size()]), size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript); } if (field == null && fieldsNames == null && script != null) { - return new ScriptTermsStringFieldFacetExecutor(size, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler()); + return new ScriptTermsStringFieldFacetExecutor(size, shardSize, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler()); } FieldMapper fieldMapper = context.smartNameFieldMapper(field); @@ -190,17 +198,17 @@ public class TermsFacetParser extends AbstractComponent implements FacetParser { if (indexFieldData instanceof IndexNumericFieldData) { IndexNumericFieldData indexNumericFieldData = (IndexNumericFieldData) indexFieldData; if (indexNumericFieldData.getNumericType().isFloatingPoint()) { - return new TermsDoubleFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); + return new TermsDoubleFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); } else { - return new TermsLongFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); + return new TermsLongFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler()); } } else { if (script != null || "map".equals(executionHint)) { - return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript); + return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript); } else if (indexFieldData instanceof IndexFieldData.WithOrdinals) { - return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove); + return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove); } else { - return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript); + return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript); } } } diff --git a/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java b/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java index df260768495..10b59a990ff 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/doubles/InternalDoubleTermsFacet.java @@ -162,7 +162,13 @@ public class InternalDoubleTermsFacet extends InternalTermsFacet { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { - return facets.get(0); + Facet facet = facets.get(0); + + // can be of type InternalStringTermsFacet representing unmapped fields + if (facet instanceof InternalDoubleTermsFacet) { + ((InternalDoubleTermsFacet) facet).trimExcessEntries(); + } + return facet; } InternalDoubleTermsFacet first = null; @@ -197,6 +203,25 @@ public class InternalDoubleTermsFacet extends InternalTermsFacet { return first; } + private void trimExcessEntries() { + if (requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java index 86f9c24208f..821dd23c248 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/doubles/TermsDoubleFacetExecutor.java @@ -53,6 +53,7 @@ public class TermsDoubleFacetExecutor extends FacetExecutor { private final IndexNumericFieldData indexFieldData; private final TermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final SearchScript script; private final ImmutableSet excluded; @@ -60,10 +61,11 @@ public class TermsDoubleFacetExecutor extends FacetExecutor { long missing; long total; - public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, SearchScript script, CacheRecycler cacheRecycler) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.excluded = excluded; @@ -120,7 +122,7 @@ public class TermsDoubleFacetExecutor extends FacetExecutor { return new InternalDoubleTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.insertWithOverflow(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value())); @@ -132,7 +134,7 @@ public class TermsDoubleFacetExecutor extends FacetExecutor { facets.release(); return new InternalDoubleTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/index/IndexNameFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/index/IndexNameFacetExecutor.java index dc8c6a3a748..7910d1328a0 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/index/IndexNameFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/index/IndexNameFacetExecutor.java @@ -36,13 +36,15 @@ public class IndexNameFacetExecutor extends FacetExecutor { private final String indexName; private final InternalStringTermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private int count = 0; - public IndexNameFacetExecutor(String indexName, TermsFacet.ComparatorType comparatorType, int size) { + public IndexNameFacetExecutor(String indexName, TermsFacet.ComparatorType comparatorType, int size, int shardSize) { this.indexName = indexName; this.comparatorType = comparatorType; this.size = size; + this.shardSize = shardSize; } @Override diff --git a/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java b/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java index 085a6cdf0a7..af89ec4d26a 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/longs/InternalLongTermsFacet.java @@ -163,7 +163,13 @@ public class InternalLongTermsFacet extends InternalTermsFacet { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { - return facets.get(0); + Facet facet = facets.get(0); + + // facet could be InternalStringTermsFacet representing unmapped fields + if (facet instanceof InternalLongTermsFacet) { + ((InternalLongTermsFacet) facet).trimExcessEntries(); + } + return facet; } InternalLongTermsFacet first = null; @@ -198,6 +204,25 @@ public class InternalLongTermsFacet extends InternalTermsFacet { return first; } + private void trimExcessEntries() { + if (requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java index f9d5c6722aa..9c392680931 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/longs/TermsLongFacetExecutor.java @@ -52,6 +52,7 @@ public class TermsLongFacetExecutor extends FacetExecutor { private final IndexNumericFieldData indexFieldData; private final TermsFacet.ComparatorType comparatorType; + private final int shardSize; private final int size; private final SearchScript script; private final ImmutableSet excluded; @@ -60,10 +61,11 @@ public class TermsLongFacetExecutor extends FacetExecutor { long missing; long total; - public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, SearchScript script, CacheRecycler cacheRecycler) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.excluded = excluded; @@ -119,7 +121,7 @@ public class TermsLongFacetExecutor extends FacetExecutor { return new InternalLongTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.insertWithOverflow(new InternalLongTermsFacet.LongEntry(it.key(), it.value())); @@ -131,7 +133,7 @@ public class TermsLongFacetExecutor extends FacetExecutor { facets.release(); return new InternalLongTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java index 8ecd473c831..3f417890925 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/FieldsTermsStringFacetExecutor.java @@ -41,15 +41,17 @@ public class FieldsTermsStringFacetExecutor extends FacetExecutor { private final InternalStringTermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final IndexFieldData[] indexFieldDatas; private final SearchScript script; private final HashedAggregator aggregator; long missing; long total; - public FieldsTermsStringFacetExecutor(String facetName, FieldMapper[] fieldMappers, int size, InternalStringTermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, - ImmutableSet excluded, Pattern pattern, SearchScript script) { + public FieldsTermsStringFacetExecutor(FieldMapper[] fieldMappers, int size, int shardSize, InternalStringTermsFacet.ComparatorType comparatorType, + boolean allTerms, SearchContext context, ImmutableSet excluded, Pattern pattern, SearchScript script) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.indexFieldDatas = new IndexFieldData[fieldMappers.length]; @@ -78,7 +80,7 @@ public class FieldsTermsStringFacetExecutor extends FacetExecutor { @Override public InternalFacet buildFacet(String facetName) { try { - return HashedAggregator.buildFacet(facetName, size, missing, total, comparatorType, aggregator); + return HashedAggregator.buildFacet(facetName, size, shardSize, missing, total, comparatorType, aggregator); } finally { aggregator.release(); } diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java index 54ef8e9ae0f..7dff3883e73 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/HashedAggregator.java @@ -100,14 +100,13 @@ public class HashedAggregator { public boolean shared(); } - public static InternalFacet buildFacet(String facetName, int size, long missing, long total, TermsFacet.ComparatorType comparatorType, + public static InternalFacet buildFacet(String facetName, int size, int shardSize, long missing, long total, TermsFacet.ComparatorType comparatorType, HashedAggregator aggregator) { if (aggregator.isEmpty()) { - return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.of(), - missing, total); + return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { - if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + if (shardSize < EntryPriorityQueue.LIMIT) { + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); BytesRefCountIterator iter = aggregator.getIter(); while (iter.next() != null) { ordered.insertWithOverflow(new InternalStringTermsFacet.TermEntry(iter.makeSafe(), iter.count())); @@ -120,8 +119,7 @@ public class HashedAggregator { } return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet( - comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); BytesRefCountIterator iter = aggregator.getIter(); while (iter.next() != null) { ordered.add(new InternalStringTermsFacet.TermEntry(iter.makeSafe(), iter.count())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java index 03769e1879d..491232ab123 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/InternalStringTermsFacet.java @@ -172,7 +172,9 @@ public class InternalStringTermsFacet extends InternalTermsFacet { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { - return facets.get(0); + InternalStringTermsFacet facet = (InternalStringTermsFacet) facets.get(0); + facet.trimExcessEntries(); + return facet; } InternalStringTermsFacet first = null; @@ -215,6 +217,25 @@ public class InternalStringTermsFacet extends InternalTermsFacet { return first; } + private void trimExcessEntries() { + if (requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java index b4d6966b933..a2201a9b7f1 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/ScriptTermsStringFieldFacetExecutor.java @@ -48,6 +48,7 @@ public class ScriptTermsStringFieldFacetExecutor extends FacetExecutor { private final InternalStringTermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final SearchScript script; private final Matcher matcher; private final ImmutableSet excluded; @@ -57,10 +58,11 @@ public class ScriptTermsStringFieldFacetExecutor extends FacetExecutor { long missing; long total; - public ScriptTermsStringFieldFacetExecutor(int size, InternalStringTermsFacet.ComparatorType comparatorType, SearchContext context, + public ScriptTermsStringFieldFacetExecutor(int size, int shardSize, InternalStringTermsFacet.ComparatorType comparatorType, SearchContext context, ImmutableSet excluded, Pattern pattern, String scriptLang, String script, Map params, CacheRecycler cacheRecycler) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.numberOfShards = context.numberOfShards(); this.script = context.scriptService().search(context.lookup(), scriptLang, script, params); @@ -82,8 +84,8 @@ public class ScriptTermsStringFieldFacetExecutor extends FacetExecutor { facets.release(); return new InternalStringTermsFacet(facetName, comparatorType, size, ImmutableList.of(), missing, total); } else { - if (size < EntryPriorityQueue.LIMIT) { - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + if (shardSize < EntryPriorityQueue.LIMIT) { + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); for (TObjectIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.insertWithOverflow(new InternalStringTermsFacet.TermEntry(it.key(), it.value())); @@ -95,7 +97,7 @@ public class ScriptTermsStringFieldFacetExecutor extends FacetExecutor { facets.release(); return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } else { - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); for (TObjectIntIterator it = facets.v().iterator(); it.hasNext(); ) { it.advance(); ordered.add(new InternalStringTermsFacet.TermEntry(it.key(), it.value())); diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java index 101928d3fb8..7dfc7fe5383 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringFacetExecutor.java @@ -44,6 +44,7 @@ public class TermsStringFacetExecutor extends FacetExecutor { private final TermsFacet.ComparatorType comparatorType; private final SearchScript script; + private final int shardSize; private final int size; // the aggregation map @@ -52,10 +53,11 @@ public class TermsStringFacetExecutor extends FacetExecutor { private final boolean allTerms; private final HashedAggregator aggregator; - public TermsStringFacetExecutor(IndexFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsStringFacetExecutor(IndexFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, Pattern pattern, SearchScript script) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.script = script; this.allTerms = allTerms; @@ -79,7 +81,7 @@ public class TermsStringFacetExecutor extends FacetExecutor { @Override public InternalFacet buildFacet(String facetName) { try { - return HashedAggregator.buildFacet(facetName, size, missing, total, comparatorType, aggregator); + return HashedAggregator.buildFacet(facetName, size, shardSize, missing, total, comparatorType, aggregator); } finally { aggregator.release(); } diff --git a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java index 72d5e09e1a3..1fb6d1edc3c 100644 --- a/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/terms/strings/TermsStringOrdinalsFacetExecutor.java @@ -56,6 +56,7 @@ public class TermsStringOrdinalsFacetExecutor extends FacetExecutor { final CacheRecycler cacheRecycler; private final TermsFacet.ComparatorType comparatorType; private final int size; + private final int shardSize; private final int minCount; private final ImmutableSet excluded; private final Matcher matcher; @@ -65,10 +66,11 @@ public class TermsStringOrdinalsFacetExecutor extends FacetExecutor { long missing; long total; - public TermsStringOrdinalsFacetExecutor(IndexFieldData.WithOrdinals indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, + public TermsStringOrdinalsFacetExecutor(IndexFieldData.WithOrdinals indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context, ImmutableSet excluded, Pattern pattern, int ordinalsCacheAbove) { this.indexFieldData = indexFieldData; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.ordinalsCacheAbove = ordinalsCacheAbove; @@ -107,9 +109,9 @@ public class TermsStringOrdinalsFacetExecutor extends FacetExecutor { } // YACK, we repeat the same logic, but once with an optimizer priority queue for smaller sizes - if (size < EntryPriorityQueue.LIMIT) { + if (shardSize < EntryPriorityQueue.LIMIT) { // optimize to use priority size - EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator()); + EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator()); while (queue.size() > 0) { ReaderAggregator agg = queue.top(); @@ -149,7 +151,7 @@ public class TermsStringOrdinalsFacetExecutor extends FacetExecutor { return new InternalStringTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total); } - BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), size); + BoundedTreeSet ordered = new BoundedTreeSet(comparatorType.comparator(), shardSize); while (queue.size() > 0) { ReaderAggregator agg = queue.top(); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java index b2ff0976eeb..7a58f680dd3 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetBuilder.java @@ -36,6 +36,7 @@ public class TermsStatsFacetBuilder extends FacetBuilder { private String keyField; private String valueField; private int size = -1; + private int shardSize = -1; private TermsStatsFacet.ComparatorType comparatorType; private String script; @@ -75,6 +76,16 @@ public class TermsStatsFacetBuilder extends FacetBuilder { return this; } + /** + * Sets the number of terms that will be returned from each shard. The higher the number the more accurate the results will be. The + * shard size cannot be smaller than {@link #size(int) size}, therefore in this case it will fall back and be treated as being equal to + * size. + */ + public TermsStatsFacetBuilder shardSize(int shardSize) { + this.shardSize = shardSize; + return this; + } + /** * Marks all terms to be returned, even ones with 0 counts. */ @@ -146,6 +157,9 @@ public class TermsStatsFacetBuilder extends FacetBuilder { if (size != -1) { builder.field("size", size); } + if (shardSize > size) { + builder.field("shard_size", shardSize); + } builder.endObject(); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java index 395e6b304fd..9bb1148cf5d 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/TermsStatsFacetParser.java @@ -67,6 +67,7 @@ public class TermsStatsFacetParser extends AbstractComponent implements FacetPar String keyField = null; String valueField = null; int size = 10; + int shardSize = -1; TermsStatsFacet.ComparatorType comparatorType = TermsStatsFacet.ComparatorType.COUNT; String scriptLang = null; String script = null; @@ -92,6 +93,8 @@ public class TermsStatsFacetParser extends AbstractComponent implements FacetPar script = parser.text(); } else if ("size".equals(currentFieldName)) { size = parser.intValue(); + } else if ("shard_size".equals(currentFieldName) || "shardSize".equals(currentFieldName)) { + shardSize = parser.intValue(); } else if ("all_terms".equals(currentFieldName) || "allTerms".equals(currentFieldName)) { if (parser.booleanValue()) { size = 0; // indicates all terms @@ -117,6 +120,10 @@ public class TermsStatsFacetParser extends AbstractComponent implements FacetPar } IndexFieldData keyIndexFieldData = context.fieldData().getForField(keyMapper); + if (shardSize < size) { + shardSize = size; + } + IndexNumericFieldData valueIndexFieldData = null; SearchScript valueScript = null; if (valueField != null) { @@ -134,11 +141,11 @@ public class TermsStatsFacetParser extends AbstractComponent implements FacetPar if (keyIndexFieldData instanceof IndexNumericFieldData) { IndexNumericFieldData keyIndexNumericFieldData = (IndexNumericFieldData) keyIndexFieldData; if (keyIndexNumericFieldData.getNumericType().isFloatingPoint()) { - return new TermsStatsDoubleFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, comparatorType, context); + return new TermsStatsDoubleFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, shardSize, comparatorType, context); } else { - return new TermsStatsLongFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, comparatorType, context); + return new TermsStatsLongFacetExecutor(keyIndexNumericFieldData, valueIndexFieldData, valueScript, size, shardSize, comparatorType, context); } } - return new TermsStatsStringFacetExecutor(keyIndexFieldData, valueIndexFieldData, valueScript, size, comparatorType, context); + return new TermsStatsStringFacetExecutor(keyIndexFieldData, valueIndexFieldData, valueScript, size, shardSize, comparatorType, context); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java index 24b56b4c0e9..6a5ca0bba95 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/InternalTermsStatsDoubleFacet.java @@ -172,14 +172,15 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { + InternalTermsStatsDoubleFacet tsFacet = (InternalTermsStatsDoubleFacet) facets.get(0); if (requiredSize == 0) { // we need to sort it here! - InternalTermsStatsDoubleFacet tsFacet = (InternalTermsStatsDoubleFacet) facets.get(0); if (!tsFacet.entries.isEmpty()) { List entries = tsFacet.mutableList(); CollectionUtil.timSort(entries, comparatorType.comparator()); } } + tsFacet.trimExcessEntries(); return facets.get(0); } int missing = 0; @@ -228,6 +229,25 @@ public class InternalTermsStatsDoubleFacet extends InternalTermsStatsFacet { } } + private void trimExcessEntries() { + if (requiredSize == 0 || requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java index 933cb319707..e65cb134278 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/doubles/TermsStatsDoubleFacetExecutor.java @@ -48,13 +48,15 @@ public class TermsStatsDoubleFacetExecutor extends FacetExecutor { final SearchScript script; private final int size; + private final int shardSize; final Recycler.V> entries; long missing; public TermsStatsDoubleFacetExecutor(IndexNumericFieldData keyIndexFieldData, IndexNumericFieldData valueIndexFieldData, SearchScript script, - int size, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { + int size, int shardSize, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.keyIndexFieldData = keyIndexFieldData; this.valueIndexFieldData = valueIndexFieldData; @@ -81,7 +83,7 @@ public class TermsStatsDoubleFacetExecutor extends FacetExecutor { Object[] values = entries.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); - int limit = size; + int limit = shardSize; List ordered = Lists.newArrayList(); for (int i = 0; i < limit; i++) { InternalTermsStatsDoubleFacet.DoubleEntry value = (InternalTermsStatsDoubleFacet.DoubleEntry) values[i]; diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java index f4a70153dcd..5e02f3adb3e 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/InternalTermsStatsLongFacet.java @@ -172,14 +172,15 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { + InternalTermsStatsLongFacet tsFacet = (InternalTermsStatsLongFacet) facets.get(0); if (requiredSize == 0) { // we need to sort it here! - InternalTermsStatsLongFacet tsFacet = (InternalTermsStatsLongFacet) facets.get(0); if (!tsFacet.entries.isEmpty()) { List entries = tsFacet.mutableList(); CollectionUtil.timSort(entries, comparatorType.comparator()); } } + tsFacet.trimExcessEntries(); return facets.get(0); } int missing = 0; @@ -228,6 +229,25 @@ public class InternalTermsStatsLongFacet extends InternalTermsStatsFacet { } } + private void trimExcessEntries() { + if (requiredSize == 0 || requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java index 776fc8d0297..fe3451296f8 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/longs/TermsStatsLongFacetExecutor.java @@ -49,13 +49,15 @@ public class TermsStatsLongFacetExecutor extends FacetExecutor { final SearchScript script; private final int size; + private final int shardSize; final Recycler.V> entries; long missing; public TermsStatsLongFacetExecutor(IndexNumericFieldData keyIndexFieldData, IndexNumericFieldData valueIndexFieldData, SearchScript script, - int size, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { + int size, int shardSize, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.keyIndexFieldData = keyIndexFieldData; this.valueIndexFieldData = valueIndexFieldData; @@ -84,7 +86,7 @@ public class TermsStatsLongFacetExecutor extends FacetExecutor { Object[] values = entries.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); - int limit = size; + int limit = shardSize; List ordered = Lists.newArrayList(); for (int i = 0; i < limit; i++) { InternalTermsStatsLongFacet.LongEntry value = (InternalTermsStatsLongFacet.LongEntry) values[i]; diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java index 2e56ca91c76..667162a440f 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/InternalTermsStatsStringFacet.java @@ -177,15 +177,16 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet { public Facet reduce(ReduceContext context) { List facets = context.facets(); if (facets.size() == 1) { + InternalTermsStatsStringFacet tsFacet = (InternalTermsStatsStringFacet) facets.get(0); if (requiredSize == 0) { // we need to sort it here! - InternalTermsStatsStringFacet tsFacet = (InternalTermsStatsStringFacet) facets.get(0); if (!tsFacet.entries.isEmpty()) { List entries = tsFacet.mutableList(); CollectionUtil.timSort(entries, comparatorType.comparator()); } } - return facets.get(0); + tsFacet.trimExcessEntries(); + return tsFacet; } int missing = 0; Recycler.V> map = context.cacheRecycler().hashMap(-1); @@ -220,7 +221,7 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet { } else { Object[] values = map.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); - List ordered = new ArrayList(map.v().size()); + List ordered = new ArrayList(Math.min(map.v().size(), requiredSize)); for (int i = 0; i < requiredSize; i++) { StringEntry value = (StringEntry) values[i]; if (value == null) { @@ -233,6 +234,25 @@ public class InternalTermsStatsStringFacet extends InternalTermsStatsFacet { } } + private void trimExcessEntries() { + if (requiredSize == 0 || requiredSize >= entries.size()) { + return; + } + + if (entries instanceof List) { + entries = ((List) entries).subList(0, requiredSize); + return; + } + + int i = 0; + for (Iterator iter = entries.iterator(); iter.hasNext();) { + iter.next(); + if (i++ >= requiredSize) { + iter.remove(); + } + } + } + static final class Fields { static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); static final XContentBuilderString MISSING = new XContentBuilderString("missing"); diff --git a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java index eb7368540b0..f389239ae74 100644 --- a/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java +++ b/src/main/java/org/elasticsearch/search/facet/termsstats/strings/TermsStatsStringFacetExecutor.java @@ -51,16 +51,18 @@ public class TermsStatsStringFacetExecutor extends FacetExecutor { final IndexNumericFieldData valueIndexFieldData; final SearchScript script; private final int size; + private final int shardSize; final Recycler.V> entries; long missing; public TermsStatsStringFacetExecutor(IndexFieldData keyIndexFieldData, IndexNumericFieldData valueIndexFieldData, SearchScript valueScript, - int size, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { + int size, int shardSize, TermsStatsFacet.ComparatorType comparatorType, SearchContext context) { this.keyIndexFieldData = keyIndexFieldData; this.valueIndexFieldData = valueIndexFieldData; this.script = valueScript; this.size = size; + this.shardSize = shardSize; this.comparatorType = comparatorType; this.entries = context.cacheRecycler().hashMap(-1); @@ -79,13 +81,13 @@ public class TermsStatsStringFacetExecutor extends FacetExecutor { } if (size == 0) { // all terms // all terms, just return the collection, we will sort it on the way back - return new InternalTermsStatsStringFacet(facetName, comparatorType, 0 /* indicates all terms*/, entries.v().values(), missing); + return new InternalTermsStatsStringFacet(facetName, comparatorType, 0/* indicates all terms*/, entries.v().values(), missing); } Object[] values = entries.v().internalValues(); Arrays.sort(values, (Comparator) comparatorType.comparator()); List ordered = Lists.newArrayList(); - int limit = size; + int limit = shardSize; for (int i = 0; i < limit; i++) { InternalTermsStatsStringFacet.StringEntry value = (InternalTermsStatsStringFacet.StringEntry) values[i]; if (value == null) { diff --git a/src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java b/src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java new file mode 100644 index 00000000000..54d9eea5bb1 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/facet/terms/ShardSizeTermsFacetTests.java @@ -0,0 +1,423 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.terms; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.facet.Facets; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.facet.FacetBuilders.termsFacet; +import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * + */ +@ClusterScope(scope = Scope.SUITE) +public class ShardSizeTermsFacetTests extends AbstractIntegrationTest { + + /** + * to properly test the effect/functionality of shard_size, we need to force having 2 shards and also + * control the routing such that certain documents will end on each shard. Using "djb" routing hash + ignoring the + * doc type when hashing will ensure that docs with routing value "1" will end up in a different shard than docs with + * routing value "2". + */ + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return randomSettingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 0) + .put("cluster.routing.operation.hash.type", "djb") + .put("cluster.routing.operation.use_type", "false") + .build(); + } + + @Test + public void noShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8) + .put("3", 8) + .put("2", 4) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 8) + .put("3", 8) + .put("2", 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 5) + .put("2", 4) + .put("3", 3) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_withExecutionHintMap() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).executionHint("map").order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 8) + .put("3", 8) + .put("2", 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_withExecutionHintMap_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).executionHint("map").order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put("1", 5) + .put("2", 4) + .put("3", 3) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void noShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 4) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 5) + .put(2, 4) + .put(3, 3) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 4) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 8) + .put(3, 8) + .put(2, 5) // <-- count is now fixed + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsFacet("keys").field("key").size(3).shardSize(5).order(TermsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsFacet terms = facets.facet("keys"); + List entries = terms.getEntries(); + assertThat(entries.size(), equalTo(3)); // we still only return 3 entries (based on the 'size' param) + Map expected = ImmutableMap.builder() + .put(1, 5) + .put(2, 4) + .put(3, 3) + .build(); + for (TermsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + private void indexData() throws Exception { + + /* + + + || || size = 3, shard_size = 5 || shard_size = size = 3 || + ||==========||==================================================||===============================================|| + || shard 1: || "1" - 5 | "2" - 4 | "3" - 3 | "4" - 2 | "5" - 1 || "1" - 5 | "3" - 3 | "2" - 4 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || shard 2: || "1" - 3 | "2" - 1 | "3" - 5 | "4" - 2 | "5" - 1 || "1" - 3 | "3" - 5 | "4" - 2 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || reduced: || "1" - 8 | "2" - 5 | "3" - 8 | "4" - 4 | "5" - 2 || || + || || || "1" - 8, "3" - 8, "2" - 4 <= WRONG || + || || "1" - 8 | "3" - 8 | "2" - 5 <= CORRECT || || + + + */ + + + indexDoc("1", "1", 5); + indexDoc("1", "2", 4); + indexDoc("1", "3", 3); + indexDoc("1", "4", 2); + indexDoc("1", "5", 1); + + // total docs in shard "1" = 15 + + indexDoc("2", "1", 3); + indexDoc("2", "2", 1); + indexDoc("2", "3", 5); + indexDoc("2", "4", 2); + indexDoc("2", "5", 1); + + // total docs in shard "2" = 12 + + client().admin().indices().prepareFlush("idx").execute().actionGet(); + client().admin().indices().prepareRefresh("idx").execute().actionGet(); + + long totalOnOne = client().prepareSearch("idx").setTypes("type").setRouting("1").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnOne, is(15l)); + long totalOnTwo = client().prepareSearch("idx").setTypes("type").setRouting("2").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnTwo, is(12l)); + } + + private void indexDoc(String shard, String key, int times) throws Exception { + for (int i = 0; i < times; i++) { + client().prepareIndex("idx", "type").setRouting(shard).setCreate(true).setSource(jsonBuilder() + .startObject() + .field("key", key) + .endObject()).execute().actionGet(); + } + } + +} diff --git a/src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java b/src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java new file mode 100644 index 00000000000..83c5fe5628a --- /dev/null +++ b/src/test/java/org/elasticsearch/search/facet/termsstats/ShardSizeTermsStatsFacetTests.java @@ -0,0 +1,548 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.facet.termsstats; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.facet.Facets; +import org.elasticsearch.test.AbstractIntegrationTest; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.search.facet.FacetBuilders.termsStatsFacet; +import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * + */ +@ClusterScope(scope = Scope.SUITE) +public class ShardSizeTermsStatsFacetTests extends AbstractIntegrationTest { + + /** + * to properly test the effect/functionality of shard_size, we need to force having 2 shards and also + * control the routing such that certain documents will end on each shard. Using "djb" routing hash + ignoring the + * doc type when hashing will ensure that docs with routing value "1" will end up in a different shard than docs with + * routing value "2". + */ + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return randomSettingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 0) + .put("cluster.routing.operation.hash.type", "djb") + .put("cluster.routing.operation.use_type", "false") + .build(); + } + + @Test + public void noShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 4l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void noShardSize_string_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 5l) + .put("4", 4l) + .put("5", 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).shardSize(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 5l) + .put("4", 4l) + .put("5", 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 8l) + .put("3", 8l) + .put("2", 5l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void withShardSize_string_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=string,index=not_analyzed") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put("1", 5l) + .put("2", 4l) + .put("3", 3l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTerm().string()))); + } + } + + @Test + public void noShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 4l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_long_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).shardSize(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_long_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=long") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 5l) + .put(2, 4l) + .put(3, 3l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 4l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void noShardSize_double_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double_allTerms() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(0).shardSize(3).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(5)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .put(4, 4l) + .put(5, 2l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 8l) + .put(3, 8l) + .put(2, 5l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + @Test + public void withShardSize_double_singleShard() throws Exception { + + client().admin().indices().prepareCreate("idx") + .addMapping("type", "key", "type=double") + .execute().actionGet(); + + indexData(); + + SearchResponse response = client().prepareSearch("idx").setTypes("type").setRouting("1") + .setQuery(matchAllQuery()) + .addFacet(termsStatsFacet("keys").keyField("key").valueField("value").size(3).shardSize(5).order(TermsStatsFacet.ComparatorType.COUNT)) + .execute().actionGet(); + + Facets facets = response.getFacets(); + TermsStatsFacet facet = facets.facet("keys"); + List entries = facet.getEntries(); + assertThat(entries.size(), equalTo(3)); + Map expected = ImmutableMap.builder() + .put(1, 5l) + .put(2, 4l) + .put(3, 3l) + .build(); + for (TermsStatsFacet.Entry entry : entries) { + assertThat(entry.getCount(), equalTo(expected.get(entry.getTermAsNumber().intValue()))); + } + } + + private void indexData() throws Exception { + + /* + + + || || size = 3, shard_size = 5 || shard_size = size = 3 || + ||==========||==================================================||===============================================|| + || shard 1: || "1" - 5 | "2" - 4 | "3" - 3 | "4" - 2 | "5" - 1 || "1" - 5 | "3" - 3 | "2" - 4 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || shard 2: || "1" - 3 | "2" - 1 | "3" - 5 | "4" - 2 | "5" - 1 || "1" - 3 | "3" - 5 | "4" - 2 || + ||----------||--------------------------------------------------||-----------------------------------------------|| + || reduced: || "1" - 8 | "2" - 5 | "3" - 8 | "4" - 4 | "5" - 2 || || + || || || "1" - 8, "3" - 8, "2" - 4 <= WRONG || + || || "1" - 8 | "3" - 8 | "2" - 5 <= CORRECT || || + + + */ + + + indexDoc("1", "1", 5); + indexDoc("1", "2", 4); + indexDoc("1", "3", 3); + indexDoc("1", "4", 2); + indexDoc("1", "5", 1); + + // total docs in shard "1" = 15 + + indexDoc("2", "1", 3); + indexDoc("2", "2", 1); + indexDoc("2", "3", 5); + indexDoc("2", "4", 2); + indexDoc("2", "5", 1); + + // total docs in shard "2" = 12 + + client().admin().indices().prepareFlush("idx").execute().actionGet(); + client().admin().indices().prepareRefresh("idx").execute().actionGet(); + + long totalOnOne = client().prepareSearch("idx").setTypes("type").setRouting("1").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnOne, is(15l)); + long totalOnTwo = client().prepareSearch("idx").setTypes("type").setRouting("2").setQuery(matchAllQuery()).execute().actionGet().getHits().getTotalHits(); + assertThat(totalOnTwo, is(12l)); + } + + private void indexDoc(String shard, String key, int times) throws Exception { + for (int i = 0; i < times; i++) { + client().prepareIndex("idx", "type").setRouting(shard).setCreate(true).setSource(jsonBuilder() + .startObject() + .field("key", key) + .field("value", 1) + .endObject()).execute().actionGet(); + } + } + +}