From da9de6bc49e878e86bdb864b8fb8378077b8512d Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 5 Dec 2018 14:57:00 +0100 Subject: [PATCH] DATAES-510 - Add reactive scroll support. The ReactiveElasticsearchClient now support scrolling through large result sets issuing subsequent _search/scroll requests while emitting data on the outbound channel. Resources bound via their scrollId get freed on completion of the flux. Original Pull Request: #231 --- .../DefaultReactiveElasticsearchClient.java | 122 ++++++++++++++++++ .../reactive/ReactiveElasticsearchClient.java | 26 ++++ .../ReactiveElasticsearchClientTests.java | 43 +++++- 3 files changed, 188 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 15e32ca38..336088f8b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -20,7 +20,9 @@ import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; +import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; import reactor.netty.tcp.TcpClient; @@ -31,7 +33,10 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -53,11 +58,15 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.main.MainRequest; import org.elasticsearch.action.main.MainResponse; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Request; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; @@ -67,6 +76,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.reactivestreams.Publisher; import org.springframework.data.elasticsearch.ElasticsearchException; @@ -85,6 +95,7 @@ import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.util.ReflectionUtils; +import org.springframework.util.StringUtils; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.client.ClientRequest; @@ -297,6 +308,74 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .flatMap(Flux::fromIterable); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) + */ + @Override + public Flux scroll(HttpHeaders headers, SearchRequest searchRequest) { + + TimeValue scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive() + : TimeValue.timeValueMinutes(1); + + if (searchRequest.scroll() == null) { + searchRequest.scroll(scrollTimeout); + } + + EmitterProcessor outbound = EmitterProcessor.create(false); + FluxSink request = outbound.sink(); + + EmitterProcessor inbound = EmitterProcessor.create(false); + + Flux exchange = outbound.startWith(searchRequest).flatMap(it -> { + + if (it instanceof SearchRequest) { + return sendRequest((SearchRequest) it, RequestCreator.search(), SearchResponse.class, headers); + } else if (it instanceof SearchScrollRequest) { + return sendRequest((SearchScrollRequest) it, RequestCreator.scroll(), SearchResponse.class, headers); + } else if (it instanceof ClearScrollRequest) { + return sendRequest((ClearScrollRequest) it, RequestCreator.clearScroll(), ClearScrollResponse.class, headers) + .flatMap(discard -> Flux.empty()); + } + + throw new IllegalArgumentException( + String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.")); + }); + + ScrollState state = new ScrollState(); + + Flux searchHits = inbound.doOnNext(searchResponse -> { + state.updateScrollId(searchResponse.getScrollId()); + }). handle((searchResponse, sink) -> { + + if (searchResponse.getHits() != null && searchResponse.getHits().getHits() != null + && searchResponse.getHits().getHits().length == 0) { + + inbound.onComplete(); + outbound.onComplete(); + + } else { + + sink.next(searchResponse); + + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(state.getScrollId()).scroll(scrollTimeout); + request.next(searchScrollRequest); + } + + }).map(SearchResponse::getHits) // + .flatMap(Flux::fromIterable) // + .doOnComplete(() -> { + + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(state.getScrollIds()); + + // just send the request, resources get cleaned up anyways after scrollTimeout has been reached. + sendRequest(clearScrollRequest, RequestCreator.clearScroll(), ClearScrollResponse.class, headers).subscribe(); + }); + + return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound)); + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest) @@ -482,6 +561,14 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return RequestConverters::search; } + static Function scroll() { + return RequestConverters::searchScroll; + } + + static Function clearScroll() { + return RequestConverters::clearScroll; + } + static Function index() { return RequestConverters::index; } @@ -549,4 +636,39 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return connectedHosts; } } + + /** + * Mutable state object holding scrollId to be used for {@link SearchScrollRequest#scroll(Scroll)} + * + * @author Christoph Strobl + * @since 4.0 + */ + private static class ScrollState { + + private Object lock = new Object(); + + private String scrollId; + private List pastIds = new ArrayList<>(1); + + String getScrollId() { + return scrollId; + } + + List getScrollIds() { + return Collections.unmodifiableList(pastIds); + } + + void updateScrollId(String scrollId) { + + if (StringUtils.hasText(scrollId)) { + + synchronized (lock) { + + this.scrollId = scrollId; + pastIds.add(scrollId); + } + } + } + + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index a46e8ff32..cfe37a8b6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.index.get.GetResult; @@ -350,6 +351,31 @@ public interface ReactiveElasticsearchClient { */ Flux search(HttpHeaders headers, SearchRequest searchRequest); + /** + * Execute the given {@link SearchRequest} against the {@literal search scroll} API. + * + * @param searchRequest must not be {@literal null}. + * @see Search + * Scroll API on elastic.co + * @return the {@link Flux} emitting {@link SearchHit hits} one by one. + */ + default Flux scroll(SearchRequest searchRequest) { + return scroll(HttpHeaders.EMPTY, searchRequest); + } + + /** + * Execute the given {@link SearchRequest} against the {@literal search scroll} API.
+ * Scroll keeps track of {@link SearchResponse#getScrollId() scrollIds} returned by the server and provides them when + * requesting more results via {@code _search/scroll}. All bound server resources are freed on completion. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param searchRequest must not be {@literal null}. + * @see Search + * Scroll API on elastic.co + * @return the {@link Flux} emitting {@link SearchHit hits} one by one. + */ + Flux scroll(HttpHeaders headers, SearchRequest searchRequest); + /** * Execute a {@link DeleteByQueryRequest} against the {@literal delete by query} API. * diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index 28c2a5ff4..83fdb4d9d 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; +import java.util.stream.IntStream; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; @@ -36,7 +37,9 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.rest.RestStatus; @@ -50,6 +53,7 @@ import org.springframework.data.elasticsearch.ElasticsearchVersion; import org.springframework.data.elasticsearch.ElasticsearchVersionRule; import org.springframework.data.elasticsearch.TestUtils; import org.springframework.data.elasticsearch.client.ClientConfiguration; +import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @@ -111,8 +115,8 @@ public class ReactiveElasticsearchClientTests { public void pingForUnknownHostShouldReturnFalse() { DefaultReactiveElasticsearchClient - .create(ClientConfiguration.builder().connectedTo("localhost:4711") - .withConnectTimeout(Duration.ofSeconds(2)).build()) + .create(ClientConfiguration.builder().connectedTo("localhost:4711").withConnectTimeout(Duration.ofSeconds(2)) + .build()) .ping() // .as(StepVerifier::create) // .expectNext(false) // @@ -413,7 +417,7 @@ public class ReactiveElasticsearchClientTests { @Test // DATAES-488 public void searchShouldCompleteIfNothingFound() throws IOException { - syncClient.indices().create(new CreateIndexRequest(INDEX_I)); + syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT); SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) // .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); @@ -460,6 +464,39 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } + @Test // DATAES-510 + public void scrollShouldReadWhileEndNotReached() { + + IntStream.range(0, 100).forEach(it -> add(Collections.singletonMap(it + "-foo", "bar")).ofType(TYPE_I).to(INDEX_I)); + + SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) // + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); + + request = request.scroll(TimeValue.timeValueMinutes(1)); + + client.scroll(HttpHeaders.EMPTY, request) // + .as(StepVerifier::create) // + .expectNextCount(100) // + .verifyComplete(); + } + + @Test // DATAES-510 + public void scrollShouldReadWhileTakeNotReached() { + + IntStream.range(0, 100).forEach(it -> add(Collections.singletonMap(it + "-foo", "bar")).ofType(TYPE_I).to(INDEX_I)); + + SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) // + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); + + request = request.scroll(TimeValue.timeValueMinutes(1)); + + client.scroll(HttpHeaders.EMPTY, request) // + .take(73) + .as(StepVerifier::create) // + .expectNextCount(73) // + .verifyComplete(); + } + AddToIndexOfType addSourceDocument() { return add(DOC_SOURCE); }