Add support for clear scroll to high level REST client (#25038)

This commit is contained in:
Luca Cavanna 2017-06-06 14:30:42 +02:00 committed by GitHub
parent fbf2e3d574
commit d47d47928b
5 changed files with 130 additions and 0 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ActiveShardCount;
@ -344,6 +345,11 @@ final class Request {
return new Request("GET", "/_search/scroll", Collections.emptyMap(), entity);
}
static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOException {
HttpEntity entity = createEntity(clearScrollRequest, REQUEST_BODY_CONTENT_TYPE);
return new Request("DELETE", "/_search/scroll", Collections.emptyMap(), entity);
}
private static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) throws IOException {
BytesRef source = XContentHelper.toXContent(toXContent, xContentType, false).toBytesRef();
return new ByteArrayEntity(source.bytes, source.offset, source.length, ContentType.create(xContentType.mediaType()));

View File

@ -36,6 +36,8 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
@ -347,6 +349,28 @@ public class RestHighLevelClient {
listener, emptySet(), headers);
}
/**
* Clears one or more scroll ids using the Clear Scroll api
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#_clear_scroll_api">
* Clear Scroll API on elastic.co</a>
*/
public ClearScrollResponse clearScroll(ClearScrollRequest clearScrollRequest, Header... headers) throws IOException {
return performRequestAndParseEntity(clearScrollRequest, Request::clearScroll, ClearScrollResponse::fromXContent,
emptySet(), headers);
}
/**
* Asynchronously clears one or more scroll ids using the Clear Scroll api
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#_clear_scroll_api">
* Clear Scroll API on elastic.co</a>
*/
public void clearScrollAsync(ClearScrollRequest clearScrollRequest, ActionListener<ClearScrollResponse> listener, Header... headers) {
performRequestAsyncAndParseEntity(clearScrollRequest, Request::clearScroll, ClearScrollResponse::fromXContent,
listener, emptySet(), headers);
}
private <Req extends ActionRequest, Resp> Resp performRequestAndParseEntity(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
CheckedFunction<XContentParser, Resp, IOException> entityParser,

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
@ -731,6 +732,21 @@ public class RequestTests extends ESTestCase {
assertEquals("/_search/scroll", request.endpoint);
assertEquals(0, request.params.size());
assertToXContentBody(searchScrollRequest, request.entity);
assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaType(), request.entity.getContentType().getValue());
}
public void testClearScroll() throws IOException {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
int numScrolls = randomIntBetween(1, 10);
for (int i = 0; i < numScrolls; i++) {
clearScrollRequest.addScrollId(randomAlphaOfLengthBetween(5, 10));
}
Request request = Request.clearScroll(clearScrollRequest);
assertEquals("DELETE", request.method);
assertEquals("/_search/scroll", request.endpoint);
assertEquals(0, request.params.size());
assertToXContentBody(clearScrollRequest, request.entity);
assertEquals(Request.REQUEST_BODY_CONTENT_TYPE.mediaType(), request.entity.getContentType().getValue());
}
private static void assertToXContentBody(ToXContent expectedBody, HttpEntity actualEntity) throws IOException {

View File

@ -42,6 +42,8 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.SearchScrollRequest;
@ -161,6 +163,19 @@ public class RestHighLevelClientTests extends ESTestCase {
isNotNull(HttpEntity.class), argThat(new HeadersVarargMatcher(headers)));
}
public void testClearScroll() throws IOException {
Header[] headers = randomHeaders(random(), "Header");
ClearScrollResponse mockClearScrollResponse = new ClearScrollResponse(randomBoolean(), randomIntBetween(0, Integer.MAX_VALUE));
mockResponse(mockClearScrollResponse);
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(randomAlphaOfLengthBetween(5, 10));
ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, headers);
assertEquals(mockClearScrollResponse.isSucceeded(), clearScrollResponse.isSucceeded());
assertEquals(mockClearScrollResponse.getNumFreed(), clearScrollResponse.getNumFreed());
verify(restClient).performRequest(eq("DELETE"), eq("/_search/scroll"), eq(Collections.emptyMap()),
isNotNull(HttpEntity.class), argThat(new HeadersVarargMatcher(headers)));
}
private void mockResponse(ToXContent toXContent) throws IOException {
Response response = mock(Response.class);
ContentType contentType = ContentType.parse(Request.REQUEST_BODY_CONTENT_TYPE.mediaType());

View File

@ -19,11 +19,19 @@
package org.elasticsearch.client;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.join.aggregations.Children;
import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder;
@ -37,6 +45,7 @@ import org.elasticsearch.search.aggregations.matrix.stats.MatrixStats;
import org.elasticsearch.search.aggregations.matrix.stats.MatrixStatsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder;
@ -46,11 +55,14 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
public class SearchIT extends ESRestHighLevelClientTestCase {
@ -386,6 +398,63 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
}
}
public void testSearchScroll() throws Exception {
for (int i = 0; i < 100; i++) {
XContentBuilder builder = jsonBuilder().startObject().field("field", i).endObject();
HttpEntity entity = new NStringEntity(builder.string(), ContentType.APPLICATION_JSON);
client().performRequest("PUT", "test/type1/" + Integer.toString(i), Collections.emptyMap(), entity);
}
client().performRequest("POST", "/test/_refresh");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(35).sort("field", SortOrder.ASC);
SearchRequest searchRequest = new SearchRequest("test").scroll(TimeValue.timeValueMinutes(2)).source(searchSourceBuilder);
SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);
try {
long counter = 0;
assertSearchHeader(searchResponse);
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100L));
assertThat(searchResponse.getHits().getHits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.getSortValues()[0]).longValue(), equalTo(counter++));
}
searchResponse = execute(new SearchScrollRequest(searchResponse.getScrollId()).scroll(TimeValue.timeValueMinutes(2)),
highLevelClient()::searchScroll, highLevelClient()::searchScrollAsync);
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100L));
assertThat(searchResponse.getHits().getHits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertEquals(counter++, ((Number) hit.getSortValues()[0]).longValue());
}
searchResponse = execute(new SearchScrollRequest(searchResponse.getScrollId()).scroll(TimeValue.timeValueMinutes(2)),
highLevelClient()::searchScroll, highLevelClient()::searchScrollAsync);
assertThat(searchResponse.getHits().getTotalHits(), equalTo(100L));
assertThat(searchResponse.getHits().getHits().length, equalTo(30));
for (SearchHit hit : searchResponse.getHits()) {
assertEquals(counter++, ((Number) hit.getSortValues()[0]).longValue());
}
} finally {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(searchResponse.getScrollId());
ClearScrollResponse clearScrollResponse = execute(clearScrollRequest, highLevelClient()::clearScroll,
highLevelClient()::clearScrollAsync);
assertThat(clearScrollResponse.getNumFreed(), greaterThan(0));
assertTrue(clearScrollResponse.isSucceeded());
SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId()).scroll(TimeValue.timeValueMinutes(2));
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> execute(scrollRequest,
highLevelClient()::searchScroll, highLevelClient()::searchScrollAsync));
assertEquals(RestStatus.NOT_FOUND, exception.status());
assertThat(exception.getRootCause(), instanceOf(ElasticsearchException.class));
ElasticsearchException rootCause = (ElasticsearchException) exception.getRootCause();
assertThat(rootCause.getMessage(), containsString("No search context found for"));
}
}
private static void assertSearchHeader(SearchResponse searchResponse) {
assertThat(searchResponse.getTook().nanos(), greaterThanOrEqualTo(0L));
assertEquals(0, searchResponse.getFailedShards());