Facet results vary depending on size, closes #259.

This commit is contained in:
kimchy 2010-07-14 11:30:39 +03:00
parent 2704ab3d69
commit a6bd64f30d
11 changed files with 105 additions and 15 deletions

View File

@ -63,8 +63,8 @@ public abstract class TransportSearchHelper {
return ret;
}
public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, SearchRequest request) {
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting);
public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request) {
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards);
internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength());
internalRequest.extraSource(request.extraSource(), request.extraSourceOffset(), request.extraSourceLength());
internalRequest.scroll(request.scroll());

View File

@ -191,7 +191,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
if (node == null) {
onFirstPhaseResult(shard, shardIt, null);
} else {
sendExecuteFirstPhase(node, internalSearchRequest(shard, request), new SearchServiceListener<FirstResult>() {
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request), new SearchServiceListener<FirstResult>() {
@Override public void onResult(FirstResult result) {
onFirstPhaseResult(shard, result, shardIt);
}

View File

@ -290,7 +290,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.state().nodes().localNodeId(), request.index(), request.shardId());
SearchContext context = new SearchContext(idGenerator.incrementAndGet(), shardTarget, request.timeout(), request.types(), engineSearcher, indexService, scriptService);
SearchContext context = new SearchContext(idGenerator.incrementAndGet(), shardTarget, request.numberOfShards(), request.timeout(), request.types(), engineSearcher, indexService, scriptService);
context.scroll(request.scroll());

View File

@ -57,6 +57,8 @@ public class TermsFacetCollector extends AbstractFacetCollector {
private final int size;
private final int numberOfShards;
private final FieldData.Type fieldDataType;
private FieldData fieldData;
@ -65,10 +67,11 @@ public class TermsFacetCollector extends AbstractFacetCollector {
private final ImmutableSet<String> excluded;
public TermsFacetCollector(String facetName, String fieldName, int size, FieldDataCache fieldDataCache, MapperService mapperService, ImmutableSet<String> excluded) {
public TermsFacetCollector(String facetName, String fieldName, int size, int numberOfShards, FieldDataCache fieldDataCache, MapperService mapperService, ImmutableSet<String> excluded) {
super(facetName);
this.fieldDataCache = fieldDataCache;
this.size = size;
this.numberOfShards = numberOfShards;
this.excluded = excluded;
FieldMapper mapper = mapperService.smartNameFieldMapper(fieldName);
@ -101,7 +104,8 @@ public class TermsFacetCollector extends AbstractFacetCollector {
pushFacets(facets);
return new InternalTermsFacet(facetName, fieldName, InternalTermsFacet.ComparatorType.COUNT, size, ImmutableList.<InternalTermsFacet.Entry>of());
} else {
BoundedTreeSet<InternalTermsFacet.Entry> ordered = new BoundedTreeSet<InternalTermsFacet.Entry>(InternalTermsFacet.ComparatorType.COUNT.comparator(), size);
// we need to fetch facets of "size * numberOfShards" because of problems in how they are distributed across shards
BoundedTreeSet<InternalTermsFacet.Entry> ordered = new BoundedTreeSet<InternalTermsFacet.Entry>(InternalTermsFacet.ComparatorType.COUNT.comparator(), size * numberOfShards);
for (TObjectIntIterator<String> it = facets.iterator(); it.hasNext();) {
it.advance();
ordered.add(new InternalTermsFacet.Entry(it.key(), it.value()));

View File

@ -64,6 +64,6 @@ public class TermsFacetCollectorParser implements FacetCollectorParser {
}
}
}
return new TermsFacetCollector(facetName, field, size, context.fieldDataCache(), context.mapperService(), excluded);
return new TermsFacetCollector(facetName, field, size, context.numberOfShards(), context.fieldDataCache(), context.mapperService(), excluded);
}
}

View File

@ -51,7 +51,7 @@ import static org.elasticsearch.search.Scroll.*;
* }
* </pre>
*
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class InternalSearchRequest implements Streamable {
@ -59,6 +59,8 @@ public class InternalSearchRequest implements Streamable {
private int shardId;
private int numberOfShards;
private Scroll scroll;
private TimeValue timeout;
@ -76,13 +78,14 @@ public class InternalSearchRequest implements Streamable {
public InternalSearchRequest() {
}
public InternalSearchRequest(ShardRouting shardRouting) {
this(shardRouting.index(), shardRouting.id());
public InternalSearchRequest(ShardRouting shardRouting, int numberOfShards) {
this(shardRouting.index(), shardRouting.id(), numberOfShards);
}
public InternalSearchRequest(String index, int shardId) {
public InternalSearchRequest(String index, int shardId, int numberOfShards) {
this.index = index;
this.shardId = shardId;
this.numberOfShards = numberOfShards;
}
public String index() {
@ -93,6 +96,10 @@ public class InternalSearchRequest implements Streamable {
return shardId;
}
public int numberOfShards() {
return numberOfShards;
}
public byte[] source() {
return this.source;
}
@ -164,6 +171,7 @@ public class InternalSearchRequest implements Streamable {
@Override public void readFrom(StreamInput in) throws IOException {
index = in.readUTF();
shardId = in.readVInt();
numberOfShards = in.readVInt();
if (in.readBoolean()) {
scroll = readScroll(in);
}
@ -198,6 +206,7 @@ public class InternalSearchRequest implements Streamable {
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(index);
out.writeVInt(shardId);
out.writeVInt(numberOfShards);
if (scroll == null) {
out.writeBoolean(false);
} else {

View File

@ -57,6 +57,8 @@ public class SearchContext implements Releasable {
private final SearchShardTarget shardTarget;
private final int numberOfShards;
private final Engine.Searcher engineSearcher;
private final ScriptService scriptService;
@ -116,10 +118,11 @@ public class SearchContext implements Releasable {
private volatile Timeout keepAliveTimeout;
public SearchContext(long id, SearchShardTarget shardTarget, TimeValue timeout,
public SearchContext(long id, SearchShardTarget shardTarget, int numberOfShards, TimeValue timeout,
String[] types, Engine.Searcher engineSearcher, IndexService indexService, ScriptService scriptService) {
this.id = id;
this.shardTarget = shardTarget;
this.numberOfShards = numberOfShards;
this.timeout = timeout;
this.types = types;
this.engineSearcher = engineSearcher;
@ -155,6 +158,10 @@ public class SearchContext implements Releasable {
return this.shardTarget;
}
public int numberOfShards() {
return this.numberOfShards;
}
public String[] types() {
return types;
}

View File

@ -179,7 +179,7 @@ public class SingleInstanceEmbeddedSearchTests extends AbstractNodesTests {
private InternalSearchRequest searchRequest(SearchSourceBuilder builder) {
return new InternalSearchRequest("test", 0).source(builder.buildAsBytes());
return new InternalSearchRequest("test", 0, 1).source(builder.buildAsBytes());
}
private void index(Client client, String id, String nameValue, int age) {

View File

@ -357,7 +357,7 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
}
private InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) {
return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes());
return new InternalSearchRequest(shardRouting, 3).source(builder.buildAsBytes());
}
private void index(Client client, String id, String nameValue, int age) {

View File

@ -363,7 +363,7 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
}
private static InternalSearchRequest searchRequest(ShardRouting shardRouting, SearchSourceBuilder builder) {
return new InternalSearchRequest(shardRouting).source(builder.buildAsBytes());
return new InternalSearchRequest(shardRouting, 3).source(builder.buildAsBytes());
}
private void index(Client client, String id, String nameValue, int age) {

View File

@ -208,6 +208,76 @@ public class SimpleFacetsTests extends AbstractNodesTests {
assertThat(facet.entries().get(1).count(), equalTo(1));
}
@Test public void testTermFacetWithEqualTermDistribution() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
// at the end of the index, we should have 10 of each `bar`, `foo`, and `baz`
for (int i = 0; i < 5; i++) {
client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
.field("text", "foo bar")
.endObject()).execute().actionGet();
}
for (int i = 0; i < 5; i++) {
client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
.field("text", "bar baz")
.endObject()).execute().actionGet();
}
for (int i = 0; i < 5; i++) {
client.prepareIndex("test", "type1").setSource(jsonBuilder().startObject()
.field("text", "baz foo")
.endObject()).execute().actionGet();
}
client.admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("facet1").field("text").size(3))
.execute().actionGet();
TermsFacet facet = searchResponse.facets().facet(TermsFacet.class, "facet1");
assertThat(facet.name(), equalTo("facet1"));
assertThat(facet.entries().size(), equalTo(3));
for (int i = 0; i < 3; i++) {
assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz")));
assertThat(facet.entries().get(i).count(), equalTo(10));
}
searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("facet1").field("text").size(2))
.execute().actionGet();
facet = searchResponse.facets().facet(TermsFacet.class, "facet1");
assertThat(facet.name(), equalTo("facet1"));
assertThat(facet.entries().size(), equalTo(2));
for (int i = 0; i < 2; i++) {
assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz")));
assertThat(facet.entries().get(i).count(), equalTo(10));
}
searchResponse = client.prepareSearch()
.setQuery(matchAllQuery())
.addFacet(termsFacet("facet1").field("text").size(1))
.execute().actionGet();
facet = searchResponse.facets().facet(TermsFacet.class, "facet1");
assertThat(facet.name(), equalTo("facet1"));
assertThat(facet.entries().size(), equalTo(1));
for (int i = 0; i < 1; i++) {
assertThat(facet.entries().get(i).term(), anyOf(equalTo("foo"), equalTo("bar"), equalTo("baz")));
assertThat(facet.entries().get(i).count(), equalTo(10));
}
}
@Test public void testStatsFacets() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();