DATAES-921 - Favour exchangeToMono over deprecated exchange.

Original PR: #530
This commit is contained in:
Peter-Josef Meisch 2020-10-06 20:58:45 +02:00
parent 2a8c1dbdf8
commit 980aff30ae
No known key found for this signature in database
GPG Key ID: DE108246970C7708
4 changed files with 13 additions and 16 deletions

View File

@ -117,6 +117,7 @@
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId> <artifactId>spring-webflux</artifactId>
<version>5.3.0-SNAPSHOT</version>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>

View File

@ -107,7 +107,6 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost;
@ -119,7 +118,6 @@ import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.data.util.Lazy; import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
@ -574,7 +572,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)
*/ */
@Override @Override
public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) { public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
return this.hostProvider.getActive(Verification.LAZY) // return this.hostProvider.getActive(Verification.LAZY) //
.flatMap(callback::doWithClient) // .flatMap(callback::doWithClient) //
@ -617,11 +615,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
String logId = ClientLogger.newLogId(); String logId = ClientLogger.newLogId();
return execute(webClient -> sendRequest(webClient, logId, request, headers)) return Flux.from(execute(webClient -> sendRequest(webClient, logId, request, headers)
.flatMapMany(response -> readResponseBody(logId, request, response, responseType)); .exchangeToMono(clientResponse -> Mono.from(readResponseBody(logId, request, clientResponse, responseType)))));
} }
private Mono<ClientResponse> sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) { private RequestBodySpec sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) // RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
.uri(builder -> { .uri(builder -> {
@ -669,9 +667,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
} }
return requestBodySpec // return requestBodySpec;
.exchange() //
.onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());
} }
// region indices operations // region indices operations

View File

@ -616,7 +616,7 @@ public interface ReactiveElasticsearchClient {
* @return the {@link Mono} emitting the {@link ClientResponse} once subscribed. * @return the {@link Mono} emitting the {@link ClientResponse} once subscribed.
*/ */
@SuppressWarnings("JavaDoc") @SuppressWarnings("JavaDoc")
Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback); <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback);
/** /**
* Get the current client {@link Status}. <br /> * Get the current client {@link Status}. <br />
@ -633,8 +633,8 @@ public interface ReactiveElasticsearchClient {
* @author Christoph Strobl * @author Christoph Strobl
* @since 3.2 * @since 3.2
*/ */
interface ReactiveElasticsearchClientCallback { interface ReactiveElasticsearchClientCallback<T> {
Mono<ClientResponse> doWithClient(WebClient client); Mono<T> doWithClient(WebClient client);
} }
/** /**

View File

@ -19,8 +19,8 @@ import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.search.internal.SearchContext.*; import static org.elasticsearch.search.internal.SearchContext.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.function.Function; import java.util.function.Function;
@ -28,14 +28,14 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request; import org.elasticsearch.client.Request;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import reactor.test.StepVerifier;
/** /**
* @author Peter-Josef Meisch * @author Peter-Josef Meisch
@ -58,7 +58,7 @@ class DefaultReactiveElasticsearchClientTest {
} }
}) { }) {
@Override @Override
public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) { public Mono<ResponseSpec> execute(ReactiveElasticsearchClientCallback callback) {
return Mono.empty(); return Mono.empty();
} }
}; };