diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 968ffe094..a5ce6cde3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -22,6 +22,7 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter; import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxProcessor; import reactor.core.publisher.Mono; @@ -467,12 +468,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch searchRequest.scroll(scrollTimeout); } - Sinks.Many request = Sinks.many().unicast().onBackpressureBuffer(); + Sinks.Many requests = Sinks.many().unicast().onBackpressureBuffer(); FluxProcessor inbound = FluxProcessor .fromSink(Sinks.many().unicast().onBackpressureBuffer()); - Flux exchange = request.asFlux().flatMap(it -> { + Flux exchange = requests.asFlux().flatMap(it -> { if (it instanceof SearchRequest) { return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers); @@ -483,8 +484,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .flatMap(discard -> Flux.empty()); } - throw new IllegalArgumentException( - String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it)); + return Flux.error(new IllegalArgumentException(String + .format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it))); }); return Flux.usingWhen(Mono.fromSupplier(ScrollState::new), @@ -497,7 +498,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch if (isEmpty(searchResponse.getHits())) { inbound.onComplete(); - request.emitComplete(); + requests.tryEmitComplete(); } else { @@ -505,7 +506,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId()) .scroll(scrollTimeout); - request.emitNext(searchScrollRequest); + tryEmitNext(requests, searchScrollRequest); } }).map(SearchResponse::getHits) // @@ -513,7 +514,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return searchHits.doOnSubscribe(ignore -> { exchange.subscribe(inbound); - request.emitNext(searchRequest); + tryEmitNext(requests, searchRequest); }); }, state -> cleanupScroll(headers, state), // @@ -521,6 +522,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch state -> cleanupScroll(headers, state)); // } + private void tryEmitNext(Sinks.Many sink, ActionRequest request) { + + Sinks.Emission emission = sink.tryEmitNext(request); + + if (emission == Sinks.Emission.FAIL_OVERFLOW) { + sink.tryEmitError(Exceptions.failWithOverflow("Backpressure overflow during Sinks.Many#emitNext")); + } + } + private static boolean isEmpty(@Nullable SearchHits hits) { return hits != null && hits.getHits() != null && hits.getHits().length == 0; }