From 103bf9f1b980063768ad93fbef872d79f47a62f7 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Wed, 5 Aug 2020 14:36:41 +0200 Subject: [PATCH] DATAES-894 - Adapt to changes in Reactor This commit updates the `DefaultReactiveElasticsearchClient` after changes in Reactor around `Processors` and `Sinks`. Original pull request: #498. --- .../DefaultReactiveElasticsearchClient.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index c3f6f7c62..8c05bca60 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -23,9 +23,8 @@ import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxIdentityProcessor; +import reactor.core.publisher.FluxProcessor; import reactor.core.publisher.Mono; -import reactor.core.publisher.Processors; import reactor.core.publisher.Sinks; import reactor.netty.http.client.HttpClient; import reactor.netty.transport.ProxyProvider; @@ -467,8 +466,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch searchRequest.scroll(scrollTimeout); } - Sinks.StandaloneFluxSink request = Sinks.unicast(); - FluxIdentityProcessor inbound = Processors.unicast(); + Sinks.Many request = Sinks.many().unicast().onBackpressureBuffer(); + + FluxProcessor inbound = FluxProcessor.fromSink(Sinks.many().unicast().onBackpressureBuffer()); Flux exchange = request.asFlux().flatMap(it -> { @@ -495,7 +495,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch if (isEmpty(searchResponse.getHits())) { inbound.onComplete(); - request.complete(); + request.emitComplete(); } else { @@ -503,7 +503,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId()) .scroll(scrollTimeout); - request.next(searchScrollRequest); + request.emitNext(searchScrollRequest); } }).map(SearchResponse::getHits) // @@ -511,7 +511,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return searchHits.doOnSubscribe(ignore -> { exchange.subscribe(inbound); - request.next(searchRequest); + request.emitNext(searchRequest); }); }, state -> cleanupScroll(headers, state), //