DATAES-894 - Adapt to changes in Reactor

This commit updates the `DefaultReactiveElasticsearchClient` after
changes in Reactor around `Processors` and `Sinks`.

Original pull request: #498.
This commit is contained in:
Brian Clozel 2020-08-05 14:36:41 +02:00 committed by Mark Paluch
parent b30f12503d
commit 103bf9f1b9
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849

View File

@ -23,9 +23,8 @@ import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxIdentityProcessor;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Processors;
import reactor.core.publisher.Sinks;
import reactor.netty.http.client.HttpClient;
import reactor.netty.transport.ProxyProvider;
@ -467,8 +466,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
searchRequest.scroll(scrollTimeout);
}
Sinks.StandaloneFluxSink<ActionRequest> request = Sinks.unicast();
FluxIdentityProcessor<SearchResponse> inbound = Processors.unicast();
Sinks.Many<ActionRequest> request = Sinks.many().unicast().onBackpressureBuffer();
FluxProcessor<SearchResponse, SearchResponse> inbound = FluxProcessor.fromSink(Sinks.many().unicast().onBackpressureBuffer());
Flux<SearchResponse> exchange = request.asFlux().flatMap(it -> {
@ -495,7 +495,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
if (isEmpty(searchResponse.getHits())) {
inbound.onComplete();
request.complete();
request.emitComplete();
} else {
@ -503,7 +503,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
.scroll(scrollTimeout);
request.next(searchScrollRequest);
request.emitNext(searchScrollRequest);
}
}).map(SearchResponse::getHits) //
@ -511,7 +511,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
return searchHits.doOnSubscribe(ignore -> {
exchange.subscribe(inbound);
request.next(searchRequest);
request.emitNext(searchRequest);
});
}, state -> cleanupScroll(headers, state), //