From d2e61af9eeb6a706ed3f43aa4cf707dc6d20c736 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 20 Mar 2011 00:19:26 +0200 Subject: [PATCH] maintain total hits across scan scroll requests --- .../type/TransportSearchScanAction.java | 3 +- .../type/TransportSearchScrollScanAction.java | 5 +- .../search/internal/InternalSearchHits.java | 2 +- .../search/scan/SearchScanTests.java | 117 +++++++++++------- 4 files changed, 74 insertions(+), 53 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java index 42758cdbfc7..bab1b50b473 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScanAction.java @@ -76,11 +76,10 @@ public class TransportSearchScanAction extends TransportSearchTypeAction { } @Override protected void moveToSecondPhase() throws Exception { - long totalHits = 0; final InternalSearchResponse internalResponse = searchPhaseController.merge(EMPTY_DOCS, queryResults, ImmutableMap.of()); String scrollId = null; if (request.scroll() != null) { - scrollId = buildScrollId(request.searchType(), queryResults.values(), null); + scrollId = buildScrollId(request.searchType(), queryResults.values(), ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits()))); } listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures())); searchCache.releaseQueryResults(queryResults); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index 9cccbf3665b..ee8af774e62 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -108,7 +108,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { public void start() { if (scrollId.context().length == 0) { - final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, 0, 0.0f), null, false); + final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.attributes().get("total_hits")), 0.0f), null, false); searchCache.releaseQueryFetchResults(queryFetchResults); listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache))); return; @@ -224,6 +224,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { } } final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults); + ((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.attributes().get("total_hits")); for (QueryFetchSearchResult shardResult : queryFetchResults.values()) { @@ -236,7 +237,7 @@ public class TransportSearchScrollScanAction extends AbstractComponent { String scrollId = null; if (request.scroll() != null) { // we rebuild the scroll id since we remove shards that we finished scrolling on - scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), null); + scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), this.scrollId.attributes()); // continue moving the total_hits } searchCache.releaseQueryFetchResults(queryFetchResults); listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java index 54f1572399e..de66daa6fb7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHits.java @@ -95,7 +95,7 @@ public class InternalSearchHits implements SearchHits { private InternalSearchHit[] hits; - private long totalHits; + public long totalHits; private float maxScore; diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/scan/SearchScanTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/scan/SearchScanTests.java index dc1dedc3735..ea4c2832649 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/scan/SearchScanTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/scan/SearchScanTests.java @@ -56,62 +56,82 @@ public class SearchScanTests extends AbstractNodesTests { return client("node1"); } - @Test public void testSimpleScroll1() throws Exception { - try { - client.admin().indices().prepareDelete("test").execute().actionGet(); - } catch (Exception e) { - // ignore - } - client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet(); - client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); - - Set ids = Sets.newHashSet(); - Set expectedIds = Sets.newHashSet(); - for (int i = 0; i < 100; i++) { - String id = Integer.toString(i); - expectedIds.add(id); - client.prepareIndex("test", "type1", id).setSource("field", i).execute().actionGet(); - } - - client.admin().indices().prepareRefresh().execute().actionGet(); - - SearchResponse searchResponse = client.prepareSearch() - .setSearchType(SearchType.SCAN) - .setQuery(matchAllQuery()) - .setSize(7) - .setScroll(TimeValue.timeValueMinutes(2)) - .execute().actionGet(); - - assertThat(searchResponse.hits().totalHits(), equalTo(100l)); - - // start scrolling, until we get not results - while (true) { - searchResponse = client.prepareSearchScroll(searchResponse.scrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet(); - assertThat(searchResponse.failedShards(), equalTo(0)); - for (SearchHit hit : searchResponse.hits()) { - assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false)); - ids.add(hit.id()); - } - if (searchResponse.hits().totalHits() == 0) { - break; - } - } - - assertThat(expectedIds, equalTo(ids)); + @Test public void shard1docs100size3() throws Exception { + testScroll(1, 100, 3); } - @Test public void testSimpleScroll2() throws Exception { + @Test public void shard1docs100size7() throws Exception { + testScroll(1, 100, 7); + } + + @Test public void shard1docs100size13() throws Exception { + testScroll(1, 100, 13); + } + + @Test public void shard1docs100size24() throws Exception { + testScroll(1, 100, 24); + } + + @Test public void shard1docs100size45() throws Exception { + testScroll(1, 100, 45); + } + + @Test public void shard1docs100size63() throws Exception { + testScroll(1, 100, 63); + } + + @Test public void shard1docs100size89() throws Exception { + testScroll(1, 100, 89); + } + + @Test public void shard1docs100size120() throws Exception { + testScroll(1, 100, 120); + } + + @Test public void shard3docs100size3() throws Exception { + testScroll(3, 100, 3); + } + + @Test public void shard3docs100size7() throws Exception { + testScroll(3, 100, 7); + } + + @Test public void shard3docs100size13() throws Exception { + testScroll(3, 100, 13); + } + + @Test public void shard3docs100size24() throws Exception { + testScroll(3, 100, 24); + } + + @Test public void shard3docs100size45() throws Exception { + testScroll(3, 100, 45); + } + + @Test public void shard3docs100size63() throws Exception { + testScroll(3, 100, 63); + } + + @Test public void shard3docs100size89() throws Exception { + testScroll(3, 100, 89); + } + + @Test public void shard3docs100size120() throws Exception { + testScroll(3, 100, 120); + } + + private void testScroll(int numberOfShards, long numberOfDocs, int size) throws Exception { try { client.admin().indices().prepareDelete("test").execute().actionGet(); } catch (Exception e) { // ignore } - client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); + client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", numberOfShards)).execute().actionGet(); client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); Set ids = Sets.newHashSet(); Set expectedIds = Sets.newHashSet(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < numberOfDocs; i++) { String id = Integer.toString(i); expectedIds.add(id); client.prepareIndex("test", "type1", id).setSource("field", i).execute().actionGet(); @@ -122,21 +142,22 @@ public class SearchScanTests extends AbstractNodesTests { SearchResponse searchResponse = client.prepareSearch() .setSearchType(SearchType.SCAN) .setQuery(matchAllQuery()) - .setSize(10) + .setSize(size) .setScroll(TimeValue.timeValueMinutes(2)) .execute().actionGet(); - assertThat(searchResponse.hits().totalHits(), equalTo(100l)); + assertThat(searchResponse.hits().totalHits(), equalTo(numberOfDocs)); // start scrolling, until we get not results while (true) { searchResponse = client.prepareSearchScroll(searchResponse.scrollId()).setScroll(TimeValue.timeValueMinutes(2)).execute().actionGet(); + assertThat(searchResponse.hits().totalHits(), equalTo(numberOfDocs)); assertThat(searchResponse.failedShards(), equalTo(0)); for (SearchHit hit : searchResponse.hits()) { assertThat(hit.id() + "should not exists in the result set", ids.contains(hit.id()), equalTo(false)); ids.add(hit.id()); } - if (searchResponse.hits().totalHits() == 0) { + if (searchResponse.hits().hits().length == 0) { break; } }