DATAES-922 - Move off Sink.emitXXX methods.

We now move oft the deprecated emitXXX(…) methods of the Sink API by switching to the the tryEmitXXX(…) methods.
This commit is contained in:
Mark Paluch 2020-09-14 11:22:04 +02:00
parent d03510528b
commit fd707abdf0
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849

View File

@ -22,6 +22,7 @@ import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
@ -467,12 +468,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
searchRequest.scroll(scrollTimeout);
}
Sinks.Many<ActionRequest> request = Sinks.many().unicast().onBackpressureBuffer();
Sinks.Many<ActionRequest> requests = Sinks.many().unicast().onBackpressureBuffer();
FluxProcessor<SearchResponse, SearchResponse> inbound = FluxProcessor
.fromSink(Sinks.many().unicast().onBackpressureBuffer());
Flux<SearchResponse> exchange = request.asFlux().flatMap(it -> {
Flux<SearchResponse> exchange = requests.asFlux().flatMap(it -> {
if (it instanceof SearchRequest) {
return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers);
@ -483,8 +484,8 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.flatMap(discard -> Flux.empty());
}
throw new IllegalArgumentException(
String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it));
return Flux.error(new IllegalArgumentException(String
.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it)));
});
return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
@ -497,7 +498,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
if (isEmpty(searchResponse.getHits())) {
inbound.onComplete();
request.emitComplete();
requests.tryEmitComplete();
} else {
@ -505,7 +506,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
.scroll(scrollTimeout);
request.emitNext(searchScrollRequest);
tryEmitNext(requests, searchScrollRequest);
}
}).map(SearchResponse::getHits) //
@ -513,7 +514,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
return searchHits.doOnSubscribe(ignore -> {
exchange.subscribe(inbound);
request.emitNext(searchRequest);
tryEmitNext(requests, searchRequest);
});
}, state -> cleanupScroll(headers, state), //
@ -521,6 +522,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
state -> cleanupScroll(headers, state)); //
}
private void tryEmitNext(Sinks.Many<ActionRequest> sink, ActionRequest request) {
Sinks.Emission emission = sink.tryEmitNext(request);
if (emission == Sinks.Emission.FAIL_OVERFLOW) {
sink.tryEmitError(Exceptions.failWithOverflow("Backpressure overflow during Sinks.Many#emitNext"));
}
}
private static boolean isEmpty(@Nullable SearchHits hits) {
return hits != null && hits.getHits() != null && hits.getHits().length == 0;
}