maintain total hits across scan scroll requests

This commit is contained in:
kimchy 2011-03-20 00:19:26 +02:00
parent 52c750fc42
commit d2e61af9ee
4 changed files with 74 additions and 53 deletions

View File

@ -76,11 +76,10 @@ public class TransportSearchScanAction extends TransportSearchTypeAction {
}
@Override protected void moveToSecondPhase() throws Exception {
long totalHits = 0;
final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryResults, ImmutableMap.<SearchShardTarget, FetchSearchResultProvider>of());
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), queryResults.values(), null);
scrollId = buildScrollId(request.searchType(), queryResults.values(), ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
searchCache.releaseQueryResults(queryResults);

View File

@ -108,7 +108,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
public void start() {
if (scrollId.context().length == 0) {
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, 0, 0.0f), null, false);
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.attributes().get("total_hits")), 0.0f), null, false);
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache)));
return;
@ -224,6 +224,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
}
}
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults);
((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.attributes().get("total_hits"));
for (QueryFetchSearchResult shardResult : queryFetchResults.values()) {
@ -236,7 +237,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent {
String scrollId = null;
if (request.scroll() != null) {
// we rebuild the scroll id since we remove shards that we finished scrolling on
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), null);
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), this.scrollId.attributes()); // continue moving the total_hits
}
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),

View File

@ -95,7 +95,7 @@ public class InternalSearchHits implements SearchHits {
private InternalSearchHit[] hits;
private long totalHits;
public long totalHits;
private float maxScore;

View File

@ -56,62 +56,82 @@ public class SearchScanTests extends AbstractNodesTests {
return client("node1");
}
@Test public void testSimpleScroll1() throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
Set<String> ids = Sets.newHashSet();
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
expectedIds.add(id);
client.prepareIndex("test", "type1", id).setSource("field", i).execute().actionGet();
}
client.admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse = client.prepareSearch()
.setSearchType(SearchType.SCAN)
.setQuery(matchAllQuery())
.setSize(7)
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
// start scrolling, until we get not results
while (true) {
searchResponse = client.prepareSearchScroll(searchResponse.scrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
assertThat(searchResponse.failedShards(), equalTo(0));
for (SearchHit hit : searchResponse.hits()) {
assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false));
ids.add(hit.id());
}
if (searchResponse.hits().totalHits() == 0) {
break;
}
}
assertThat(expectedIds, equalTo(ids));
@Test public void shard1docs100size3() throws Exception {
testScroll(1, 100, 3);
}
@Test public void testSimpleScroll2() throws Exception {
@Test public void shard1docs100size7() throws Exception {
testScroll(1, 100, 7);
}
@Test public void shard1docs100size13() throws Exception {
testScroll(1, 100, 13);
}
@Test public void shard1docs100size24() throws Exception {
testScroll(1, 100, 24);
}
@Test public void shard1docs100size45() throws Exception {
testScroll(1, 100, 45);
}
@Test public void shard1docs100size63() throws Exception {
testScroll(1, 100, 63);
}
@Test public void shard1docs100size89() throws Exception {
testScroll(1, 100, 89);
}
@Test public void shard1docs100size120() throws Exception {
testScroll(1, 100, 120);
}
@Test public void shard3docs100size3() throws Exception {
testScroll(3, 100, 3);
}
@Test public void shard3docs100size7() throws Exception {
testScroll(3, 100, 7);
}
@Test public void shard3docs100size13() throws Exception {
testScroll(3, 100, 13);
}
@Test public void shard3docs100size24() throws Exception {
testScroll(3, 100, 24);
}
@Test public void shard3docs100size45() throws Exception {
testScroll(3, 100, 45);
}
@Test public void shard3docs100size63() throws Exception {
testScroll(3, 100, 63);
}
@Test public void shard3docs100size89() throws Exception {
testScroll(3, 100, 89);
}
@Test public void shard3docs100size120() throws Exception {
testScroll(3, 100, 120);
}
private void testScroll(int numberOfShards, long numberOfDocs, int size) throws Exception {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", numberOfShards)).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
Set<String> ids = Sets.newHashSet();
Set<String> expectedIds = Sets.newHashSet();
for (int i = 0; i < 100; i++) {
for (int i = 0; i < numberOfDocs; i++) {
String id = Integer.toString(i);
expectedIds.add(id);
client.prepareIndex("test", "type1", id).setSource("field", i).execute().actionGet();
@ -122,21 +142,22 @@ public class SearchScanTests extends AbstractNodesTests {
SearchResponse searchResponse = client.prepareSearch()
.setSearchType(SearchType.SCAN)
.setQuery(matchAllQuery())
.setSize(10)
.setSize(size)
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(100l));
assertThat(searchResponse.hits().totalHits(), equalTo(numberOfDocs));
// start scrolling, until we get not results
while (true) {
searchResponse = client.prepareSearchScroll(searchResponse.scrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet();
assertThat(searchResponse.hits().totalHits(), equalTo(numberOfDocs));
assertThat(searchResponse.failedShards(), equalTo(0));
for (SearchHit hit : searchResponse.hits()) {
assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false));
ids.add(hit.id());
}
if (searchResponse.hits().totalHits() == 0) {
if (searchResponse.hits().hits().length == 0) {
break;
}
}