From bee7dbf65f7769091f2896a9fdd42433468dcdef Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 12 Oct 2020 15:50:19 +0200 Subject: [PATCH] =?UTF-8?q?DATAES-947=20-=20Use=20Flux.expand(=E2=80=A6)?= =?UTF-8?q?=20for=20recursive=20reactive=20paging.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now use Flux.expand(…) to recursively fetch search results (SearchRequest followed by multiple SearchScrollRequests) until consuming all search hits. Previously we used inbound/outbound sinks to mimic a continuations by sending a request once the previous request finished. The recursive operator allows now for a simplified operator chain along with improved readability. --- .../DefaultReactiveElasticsearchClient.java | 58 +++++-------------- 1 file changed, 14 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 86bdc04f3..67034c1a8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -46,7 +46,6 @@ import javax.net.ssl.SSLContext; import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -467,59 +466,30 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch searchRequest.scroll(scrollTimeout); } - Sinks.Many requests = Sinks.many().unicast().onBackpressureBuffer(); - Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); - - Flux exchange = requests.asFlux().flatMap(it -> { - - if (it instanceof SearchRequest) { - return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers); - } else if (it instanceof SearchScrollRequest) { - return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers); - } else if (it instanceof ClearScrollRequest) { - return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers) - .flatMap(discard -> Flux.empty()); - } - - return Flux.error(new IllegalArgumentException(String - .format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it))); - }); - return Flux.usingWhen(Mono.fromSupplier(ScrollState::new), - scrollState -> { + state -> { - Flux searchHits = inbound.asFlux(). handle((searchResponse, sink) -> { + return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) + .expand(searchResponse -> { - scrollState.updateScrollId(searchResponse.getScrollId()); - if (isEmpty(searchResponse.getHits())) { + state.updateScrollId(searchResponse.getScrollId()); + if (isEmpty(searchResponse.getHits())) { + return Mono.empty(); + } - inbound.tryEmitComplete(); - requests.tryEmitComplete(); - - } else { - - sink.next(searchResponse); - - SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId()) - .scroll(scrollTimeout); - requests.emitNext(searchScrollRequest, Sinks.EmitFailureHandler.FAIL_FAST); - } - - }).map(SearchResponse::getHits) // - .flatMap(Flux::fromIterable); - - return searchHits.doOnSubscribe(ignore -> { - exchange.subscribe(new SinkSubscriber(inbound)); - requests.emitNext(searchRequest, Sinks.EmitFailureHandler.FAIL_FAST); - }); + return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout), + requestCreator.scroll(), SearchResponse.class, headers); + }); }, state -> cleanupScroll(headers, state), // (state, ex) -> cleanupScroll(headers, state), // - state -> cleanupScroll(headers, state)); // + state -> cleanupScroll(headers, state)) // + .filter(it -> !isEmpty(it.getHits())) // + .map(SearchResponse::getHits) // + .flatMapIterable(Function.identity()); // } - private static boolean isEmpty(@Nullable SearchHits hits) { return hits != null && hits.getHits() != null && hits.getHits().length == 0; }