diff --git a/pom.xml b/pom.xml index 3873b0ba6..48e8b27ba 100644 --- a/pom.xml +++ b/pom.xml @@ -117,6 +117,7 @@ org.springframework spring-webflux + 5.3.0-SNAPSHOT true diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index d17cbf3cc..984d0785b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -107,7 +107,6 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.suggest.Suggest; import org.reactivestreams.Publisher; - import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ElasticsearchHost; @@ -119,7 +118,6 @@ import org.springframework.data.elasticsearch.client.util.ScrollState; import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.lang.Nullable; @@ -574,7 +572,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback) */ @Override - public Mono execute(ReactiveElasticsearchClientCallback callback) { + public Mono execute(ReactiveElasticsearchClientCallback callback) { return this.hostProvider.getActive(Verification.LAZY) // .flatMap(callback::doWithClient) // @@ -617,11 +615,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch String logId = ClientLogger.newLogId(); - return execute(webClient -> sendRequest(webClient, logId, request, headers)) - .flatMapMany(response -> readResponseBody(logId, request, response, responseType)); + return Flux.from(execute(webClient -> sendRequest(webClient, logId, request, headers) + .exchangeToMono(clientResponse -> Mono.from(readResponseBody(logId, request, clientResponse, responseType))))); } - private Mono sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) { + private RequestBodySpec sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) { RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) // .uri(builder -> { @@ -669,9 +667,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); } - return requestBodySpec // - .exchange() // - .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); + return requestBodySpec; } // region indices operations diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 61f1e6005..9130e9f1e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -616,7 +616,7 @@ public interface ReactiveElasticsearchClient { * @return the {@link Mono} emitting the {@link ClientResponse} once subscribed. */ @SuppressWarnings("JavaDoc") - Mono execute(ReactiveElasticsearchClientCallback callback); + Mono execute(ReactiveElasticsearchClientCallback callback); /** * Get the current client {@link Status}.
@@ -633,8 +633,8 @@ public interface ReactiveElasticsearchClient { * @author Christoph Strobl * @since 3.2 */ - interface ReactiveElasticsearchClientCallback { - Mono doWithClient(WebClient client); + interface ReactiveElasticsearchClientCallback { + Mono doWithClient(WebClient client); } /** diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java index 7e7000bd1..a4dcd7003 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java @@ -19,8 +19,8 @@ import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.search.internal.SearchContext.*; import static org.mockito.Mockito.*; -import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.util.function.Function; @@ -28,14 +28,14 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Request; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.web.reactive.function.client.ClientResponse; -import reactor.test.StepVerifier; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; /** * @author Peter-Josef Meisch @@ -58,7 +58,7 @@ class DefaultReactiveElasticsearchClientTest { } }) { @Override - public Mono execute(ReactiveElasticsearchClientCallback callback) { + public Mono execute(ReactiveElasticsearchClientCallback callback) { return Mono.empty(); } };