DATAES-947 - Adopt to API changes in Project Reactor.

Use Sinks API as subscriber instead of FluxProcessor.
This commit is contained in:
Mark Paluch 2020-10-12 12:52:23 +02:00
parent a5d9e929d9
commit d80a4bdaa1
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849

View File

@ -22,9 +22,7 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.http.client.HttpClient;
@ -107,6 +105,9 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
@ -467,9 +468,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
Sinks.Many<ActionRequest> requests = Sinks.many().unicast().onBackpressureBuffer();
FluxProcessor<SearchResponse, SearchResponse> inbound = FluxProcessor
.fromSink(Sinks.many().unicast().onBackpressureBuffer());
Sinks.Many<SearchResponse> inbound = Sinks.many().unicast().onBackpressureBuffer();
Flux<SearchResponse> exchange = requests.asFlux().flatMap(it -> {
@ -490,12 +489,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
scrollState -> {
Flux<SearchHit> searchHits = inbound.<SearchResponse> handle((searchResponse, sink) -> {
Flux<SearchHit> searchHits = inbound.asFlux().<SearchResponse> handle((searchResponse, sink) -> {
scrollState.updateScrollId(searchResponse.getScrollId());
if (isEmpty(searchResponse.getHits())) {
inbound.onComplete();
inbound.tryEmitComplete();
requests.tryEmitComplete();
} else {
@ -504,15 +503,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
.scroll(scrollTimeout);
tryEmitNext(requests, searchScrollRequest);
requests.emitNext(searchScrollRequest, Sinks.EmitFailureHandler.FAIL_FAST);
}
}).map(SearchResponse::getHits) //
.flatMap(Flux::fromIterable);
return searchHits.doOnSubscribe(ignore -> {
exchange.subscribe(inbound);
tryEmitNext(requests, searchRequest);
exchange.subscribe(new SinkSubscriber(inbound));
requests.emitNext(searchRequest, Sinks.EmitFailureHandler.FAIL_FAST);
});
}, state -> cleanupScroll(headers, state), //
@ -520,14 +519,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
state -> cleanupScroll(headers, state)); //
}
private void tryEmitNext(Sinks.Many<ActionRequest> sink, ActionRequest request) {
Sinks.Emission emission = sink.tryEmitNext(request);
if (emission == Sinks.Emission.FAIL_OVERFLOW) {
sink.tryEmitError(Exceptions.failWithOverflow("Backpressure overflow during Sinks.Many#emitNext"));
}
}
private static boolean isEmpty(@Nullable SearchHits hits) {
return hits != null && hits.getHits() != null && hits.getHits().length == 0;
@ -964,5 +955,34 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
}
private static class SinkSubscriber implements Subscriber<SearchResponse> {
private final Sinks.Many<SearchResponse> inbound;
public SinkSubscriber(Sinks.Many<SearchResponse> inbound) {
this.inbound = inbound;
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(SearchResponse searchResponse) {
inbound.emitNext(searchResponse, Sinks.EmitFailureHandler.FAIL_FAST);
}
@Override
public void onError(Throwable t) {
inbound.emitError(t, Sinks.EmitFailureHandler.FAIL_FAST);
}
@Override
public void onComplete() {
inbound.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
}
}
// endregion
}