From b30f12503dd515a0bff4657677fda2e96a066aa7 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 4 Aug 2020 12:31:59 +0200 Subject: [PATCH] DATAES-893 - Adopt to changes in Project Reactor. --- .../DefaultReactiveElasticsearchClient.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 321f9f4ae..c3f6f7c62 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -22,10 +22,11 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter; import io.netty.handler.ssl.JdkSslContext; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.FluxIdentityProcessor; import reactor.core.publisher.Mono; +import reactor.core.publisher.Processors; +import reactor.core.publisher.Sinks; import reactor.netty.http.client.HttpClient; import reactor.netty.transport.ProxyProvider; @@ -106,6 +107,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.suggest.Suggest; import org.reactivestreams.Publisher; + import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ElasticsearchHost; @@ -465,12 +467,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch searchRequest.scroll(scrollTimeout); } - EmitterProcessor outbound = EmitterProcessor.create(false); - FluxSink request = outbound.sink(); + Sinks.StandaloneFluxSink request = Sinks.unicast(); + FluxIdentityProcessor inbound = Processors.unicast(); - EmitterProcessor inbound = EmitterProcessor.create(false); - - Flux exchange = outbound.startWith(searchRequest).flatMap(it -> { + Flux exchange = request.asFlux().flatMap(it -> { if (it instanceof SearchRequest) { return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers); @@ -495,7 +495,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch if (isEmpty(searchResponse.getHits())) { inbound.onComplete(); - outbound.onComplete(); + request.complete(); } else { @@ -509,10 +509,13 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch }).map(SearchResponse::getHits) // .flatMap(Flux::fromIterable); - return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound)); + return searchHits.doOnSubscribe(ignore -> { + exchange.subscribe(inbound); + request.next(searchRequest); + }); }, state -> cleanupScroll(headers, state), // - state -> cleanupScroll(headers, state), // + (state, ex) -> cleanupScroll(headers, state), // state -> cleanupScroll(headers, state)); // }