DATAES-947 - Use Flux.expand(…) for recursive reactive paging.

We now use Flux.expand(…) to recursively fetch search results (SearchRequest followed by multiple SearchScrollRequests) until consuming all search hits.

Previously we used inbound/outbound sinks to mimic a continuations by sending a request once the previous request finished. The recursive operator allows now for a simplified operator chain along with improved readability.
This commit is contained in:
Mark Paluch 2020-10-12 15:50:19 +02:00
parent d80a4bdaa1
commit bee7dbf65f
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849

View File

@ -46,7 +46,6 @@ import javax.net.ssl.SSLContext;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@ -467,59 +466,30 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
searchRequest.scroll(scrollTimeout);
}
Sinks.Many<ActionRequest> requests = Sinks.many().unicast().onBackpressureBuffer();
Sinks.Many<SearchResponse> inbound = Sinks.many().unicast().onBackpressureBuffer();
Flux<SearchResponse> exchange = requests.asFlux().flatMap(it -> {
if (it instanceof SearchRequest) {
return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers);
} else if (it instanceof SearchScrollRequest) {
return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers);
} else if (it instanceof ClearScrollRequest) {
return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers)
.flatMap(discard -> Flux.empty());
}
return Flux.error(new IllegalArgumentException(String
.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it)));
});
return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
scrollState -> {
state -> {
Flux<SearchHit> searchHits = inbound.asFlux().<SearchResponse> handle((searchResponse, sink) -> {
return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers)
.expand(searchResponse -> {
scrollState.updateScrollId(searchResponse.getScrollId());
if (isEmpty(searchResponse.getHits())) {
state.updateScrollId(searchResponse.getScrollId());
if (isEmpty(searchResponse.getHits())) {
return Mono.empty();
}
inbound.tryEmitComplete();
requests.tryEmitComplete();
} else {
sink.next(searchResponse);
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
.scroll(scrollTimeout);
requests.emitNext(searchScrollRequest, Sinks.EmitFailureHandler.FAIL_FAST);
}
}).map(SearchResponse::getHits) //
.flatMap(Flux::fromIterable);
return searchHits.doOnSubscribe(ignore -> {
exchange.subscribe(new SinkSubscriber(inbound));
requests.emitNext(searchRequest, Sinks.EmitFailureHandler.FAIL_FAST);
});
return sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout),
requestCreator.scroll(), SearchResponse.class, headers);
});
}, state -> cleanupScroll(headers, state), //
(state, ex) -> cleanupScroll(headers, state), //
state -> cleanupScroll(headers, state)); //
state -> cleanupScroll(headers, state)) //
.filter(it -> !isEmpty(it.getHits())) //
.map(SearchResponse::getHits) //
.flatMapIterable(Function.identity()); //
}
private static boolean isEmpty(@Nullable SearchHits hits) {
return hits != null && hits.getHits() != null && hits.getHits().length == 0;
}