From a6bd64f30db2f1a5a4c1c64a98de35f6fb72019c Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 14 Jul 2010 11:30:39 +0300 Subject: [PATCH] Facet results vary depending on size, closes #259. --- .../search/type/TransportSearchHelper.java | 4 +- .../type/TransportSearchTypeAction.java | 2 +- .../elasticsearch/search/SearchService.java | 2 +- .../facets/terms/TermsFacetCollector.java | 8 ++- .../terms/TermsFacetCollectorParser.java | 2 +- .../internal/InternalSearchRequest.java | 17 +++-- .../search/internal/SearchContext.java | 9 ++- .../SingleInstanceEmbeddedSearchTests.java | 2 +- .../TwoInstanceEmbeddedSearchTests.java | 2 +- ...ceUnbalancedShardsEmbeddedSearchTests.java | 2 +- .../search/facets/SimpleFacetsTests.java | 70 +++++++++++++++++++ 11 files changed, 105 insertions(+), 15 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java index 5e1792e9b85..6f800e9aa36 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java @@ -63,8 +63,8 @@ public abstract class TransportSearchHelper { return ret; } - public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, SearchRequest request) { - InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting); + public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request) { + InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards); internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength()); internalRequest.extraSource(request.extraSource(), request.extraSourceOffset(), request.extraSourceLength()); internalRequest.scroll(request.scroll()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 145f4bf62a4..3fe0f50d427 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -191,7 +191,7 @@ public abstract class TransportSearchTypeAction extends BaseAction() { + sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request), new SearchServiceListener() { @Override public void onResult(FirstResult result) { onFirstPhaseResult(shard, result, shardIt); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index 1bf438e07ee..7ad32788555 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -290,7 +290,7 @@ public class SearchService extends AbstractLifecycleComponent { SearchShardTarget shardTarget = new SearchShardTarget(clusterService.state().nodes().localNodeId(), request.index(), request.shardId()); - SearchContext context = new SearchContext(idGenerator.incrementAndGet(), shardTarget, request.timeout(), request.types(), engineSearcher, indexService, scriptService); + SearchContext context = new SearchContext(idGenerator.incrementAndGet(), shardTarget, request.numberOfShards(), request.timeout(), request.types(), engineSearcher, indexService, scriptService); context.scroll(request.scroll()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollector.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollector.java index 6c6d76737d7..b86ecce207c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollector.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollector.java @@ -57,6 +57,8 @@ public class TermsFacetCollector extends AbstractFacetCollector { private final int size; + private final int numberOfShards; + private final FieldData.Type fieldDataType; private FieldData fieldData; @@ -65,10 +67,11 @@ public class TermsFacetCollector extends AbstractFacetCollector { private final ImmutableSet excluded; - public TermsFacetCollector(String facetName, String fieldName, int size, FieldDataCache fieldDataCache, MapperService mapperService, ImmutableSet excluded) { + public TermsFacetCollector(String facetName, String fieldName, int size, int numberOfShards, FieldDataCache fieldDataCache, MapperService mapperService, ImmutableSet excluded) { super(facetName); this.fieldDataCache = fieldDataCache; this.size = size; + this.numberOfShards = numberOfShards; this.excluded = excluded; FieldMapper mapper = mapperService.smartNameFieldMapper(fieldName); @@ -101,7 +104,8 @@ public class TermsFacetCollector extends AbstractFacetCollector { pushFacets(facets); return new InternalTermsFacet(facetName, fieldName, InternalTermsFacet.ComparatorType.COUNT, size, ImmutableList.of()); } else { - BoundedTreeSet ordered = new BoundedTreeSet(InternalTermsFacet.ComparatorType.COUNT.comparator(), size); + // we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards + BoundedTreeSet ordered = new BoundedTreeSet(InternalTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards); for (TObjectIntIterator it = facets.iterator(); it.hasNext();) { it.advance(); ordered.add(new InternalTermsFacet.Entry(it.key(), it.value())); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollectorParser.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollectorParser.java index b77a4b3b477..9ec6e0f9833 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollectorParser.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/facets/terms/TermsFacetCollectorParser.java @@ -64,6 +64,6 @@ public class TermsFacetCollectorParser implements FacetCollectorParser { } } } - return new TermsFacetCollector(facetName, field, size, context.fieldDataCache(), context.mapperService(), excluded); + return new TermsFacetCollector(facetName, field, size, context.numberOfShards(), context.fieldDataCache(), context.mapperService(), excluded); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java index 6f0d4e593b9..6e317eb84c1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java @@ -51,7 +51,7 @@ import static org.elasticsearch.search.Scroll.*; * } * * - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class InternalSearchRequest implements Streamable { @@ -59,6 +59,8 @@ public class InternalSearchRequest implements Streamable { private int shardId; + private int numberOfShards; + private Scroll scroll; private TimeValue timeout; @@ -76,13 +78,14 @@ public class InternalSearchRequest implements Streamable { public InternalSearchRequest() { } - public InternalSearchRequest(ShardRouting shardRouting) { - this(shardRouting.index(), shardRouting.id()); + public InternalSearchRequest(ShardRouting shardRouting, int numberOfShards) { + this(shardRouting.index(), shardRouting.id(), numberOfShards); } - public InternalSearchRequest(String index, int shardId) { + public InternalSearchRequest(String index, int shardId, int numberOfShards) { this.index = index; this.shardId = shardId; + this.numberOfShards = numberOfShards; } public String index() { @@ -93,6 +96,10 @@ public class InternalSearchRequest implements Streamable { return shardId; } + public int numberOfShards() { + return numberOfShards; + } + public byte[] source() { return this.source; } @@ -164,6 +171,7 @@ public class InternalSearchRequest implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { index = in.readUTF(); shardId = in.readVInt(); + numberOfShards = in.readVInt(); if (in.readBoolean()) { scroll = readScroll(in); } @@ -198,6 +206,7 @@ public class InternalSearchRequest implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeUTF(index); out.writeVInt(shardId); + out.writeVInt(numberOfShards); if (scroll == null) { out.writeBoolean(false); } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 4687e23969f..f18f94942cb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -57,6 +57,8 @@ public class SearchContext implements Releasable { private final SearchShardTarget shardTarget; + private final int numberOfShards; + private final Engine.Searcher engineSearcher; private final ScriptService scriptService; @@ -116,10 +118,11 @@ public class SearchContext implements Releasable { private volatile Timeout keepAliveTimeout; - public SearchContext(long id, SearchShardTarget shardTarget, TimeValue timeout, + public SearchContext(long id, SearchShardTarget shardTarget, int numberOfShards, TimeValue timeout, String[] types, Engine.Searcher engineSearcher, IndexService indexService, ScriptService scriptService) { this.id = id; this.shardTarget = shardTarget; + this.numberOfShards = numberOfShards; this.timeout = timeout; this.types = types; this.engineSearcher = engineSearcher; @@ -155,6 +158,10 @@ public class SearchContext implements Releasable { return this.shardTarget; } + public int numberOfShards() { + return this.numberOfShards; + } + public String[] types() { return types; } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java index 6f561ce2c7a..66b6c298bf3 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java @@ -179,7 +179,7 @@ public class SingleInstanceEmbeddedSearchTests extends AbstractNodesTests { private InternalSearchRequest searchRequest(SearchSourceBuilder builder) { - return new InternalSearchRequest("test", 0).source(builder.buildAsBytes()); + return new InternalSearchRequest("test", 0, 1).source(builder.buildAsBytes()); } private void index(Client client, String id, String nameValue, int age) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java index 34e5a5597e6..5ceb3279178 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceEmbeddedSearchTests.java @@ -357,7 +357,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests { } private InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) { - return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes()); + return new InternalSearchRequest(shardRouting, 3).source(builder.buildAsBytes()); } private void index(Client client, String id, String nameValue, int age) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java index 2386a32673e..27de7db196d 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TwoInstanceUnbalancedShardsEmbeddedSearchTests.java @@ -363,7 +363,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode } private static InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) { - return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes()); + return new InternalSearchRequest(shardRouting, 3).source(builder.buildAsBytes()); } private void index(Client client, String id, String nameValue, int age) { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facets/SimpleFacetsTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facets/SimpleFacetsTests.java index 38c0406f36f..9831585c163 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facets/SimpleFacetsTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/facets/SimpleFacetsTests.java @@ -208,6 +208,76 @@ public class SimpleFacetsTests extends AbstractNodesTests { assertThat(facet.entries().get(1).count(), equalTo(1)); } + @Test public void testTermFacetWithEqualTermDistribution() throws Exception { + try { + client.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception e) { + // ignore + } + client.admin().indices().prepareCreate("test").execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + + // at the end of the index, we should have 10 of each `bar`, `foo`, and `baz` + for (int i = 0; i < 5; i++) { + client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject() + .field("text", "foo bar") + .endObject()).execute().actionGet(); + } + for (int i = 0; i < 5; i++) { + client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject() + .field("text", "bar baz") + .endObject()).execute().actionGet(); + } + + for (int i = 0; i < 5; i++) { + client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject() + .field("text", "baz foo") + .endObject()).execute().actionGet(); + } + client.admin().indices().prepareRefresh().execute().actionGet(); + + SearchResponse searchResponse = client.prepareSearch() + .setQuery(matchAllQuery()) + .addFacet(termsFacet("facet1").field("text").size(3)) + .execute().actionGet(); + + TermsFacet facet = searchResponse.facets().facet(TermsFacet.class, "facet1"); + assertThat(facet.name(), equalTo("facet1")); + assertThat(facet.entries().size(), equalTo(3)); + for (int i = 0; i < 3; i++) { + assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz"))); + assertThat(facet.entries().get(i).count(), equalTo(10)); + } + + searchResponse = client.prepareSearch() + .setQuery(matchAllQuery()) + .addFacet(termsFacet("facet1").field("text").size(2)) + .execute().actionGet(); + + facet = searchResponse.facets().facet(TermsFacet.class, "facet1"); + assertThat(facet.name(), equalTo("facet1")); + assertThat(facet.entries().size(), equalTo(2)); + for (int i = 0; i < 2; i++) { + assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz"))); + assertThat(facet.entries().get(i).count(), equalTo(10)); + } + + searchResponse = client.prepareSearch() + .setQuery(matchAllQuery()) + .addFacet(termsFacet("facet1").field("text").size(1)) + .execute().actionGet(); + + facet = searchResponse.facets().facet(TermsFacet.class, "facet1"); + assertThat(facet.name(), equalTo("facet1")); + assertThat(facet.entries().size(), equalTo(1)); + for (int i = 0; i < 1; i++) { + assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz"))); + assertThat(facet.entries().get(i).count(), equalTo(10)); + } + } + @Test public void testStatsFacets() throws Exception { try { client.admin().indices().prepareDelete("test").execute().actionGet();