From a39c34058bbc9fc10137dc5b536897a52309d368 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 21 Nov 2018 14:45:48 +0100 Subject: [PATCH] DATAES-488 - Polishing & Documentation. Rename VerificationMode -> Verification. Reorder methods in ReactiveElasticsearchClient, add test for DefaultWebClientProvider. Enforce assertions and fix some overall code style issues. Add client reference documentation section. --- src/main/asciidoc/index.adoc | 16 +- .../reference/elasticsearch-clients.adoc | 83 +++++++++ .../DefaultReactiveElasticsearchClient.java | 14 +- .../reactive/DefaultWebClientProvider.java | 53 ++++-- .../client/reactive/HostProvider.java | 118 ++++++------ .../reactive/MultiNodeHostProvider.java | 18 +- .../client/reactive/RawActionResponse.java | 36 ++-- .../reactive/ReactiveElasticsearchClient.java | 170 +++++++++--------- .../client/reactive/ReactiveRestClients.java | 3 +- .../reactive/SingleNodeHostProvider.java | 6 +- .../client/reactive/WebClientProvider.java | 28 ++- .../client/reactive/package-info.java | 6 + .../client/util/RequestConverters.java | 14 +- .../core/ReactiveElasticsearchTemplate.java | 8 +- .../DefaultWebClientProviderUnitTests.java | 43 +++++ .../MultiNodeHostProviderUnitTests.java | 6 +- .../ReactiveMockClientTestsUtils.java | 8 +- 17 files changed, 398 insertions(+), 232 deletions(-) create mode 100644 src/main/asciidoc/reference/elasticsearch-clients.adoc create mode 100644 src/main/java/org/springframework/data/elasticsearch/client/reactive/package-info.java create mode 100644 src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java diff --git a/src/main/asciidoc/index.adoc b/src/main/asciidoc/index.adoc index c00178cff..ef5a87891 100644 --- a/src/main/asciidoc/index.adoc +++ b/src/main/asciidoc/index.adoc @@ -1,10 +1,19 @@ -= Spring Data Elasticsearch -BioMed Central Development Team += Spring Data Elasticsearch - Reference Documentation +BioMed Central Development Team; Oliver Drotbohm; Greg Turnquist; Christoph Strobl; :revnumber: {version} :revdate: {localdate} +:toc: +:toc-placement!: +:linkcss: +:doctype: book +:docinfo: shared +:source-highlighter: prettify +:icons: font +:imagesdir: images +ifdef::backend-epub3[:front-cover-image: image:epub-cover.png[Front Cover,1050,1600]] :spring-data-commons-docs: ../../../../spring-data-commons/src/main/asciidoc -(C) 2013-2015 The original author(s). +(C) 2013-2018 The original author(s). NOTE: Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically. @@ -19,6 +28,7 @@ include::{spring-data-commons-docs}/repositories.adoc[] = Reference Documentation :leveloffset: +1 +include::reference/elasticsearch-clients.adoc[] include::reference/data-elasticsearch.adoc[] include::reference/elasticsearch-misc.adoc[] :leveloffset: -1 diff --git a/src/main/asciidoc/reference/elasticsearch-clients.adoc b/src/main/asciidoc/reference/elasticsearch-clients.adoc new file mode 100644 index 000000000..72a9a49d5 --- /dev/null +++ b/src/main/asciidoc/reference/elasticsearch-clients.adoc @@ -0,0 +1,83 @@ +[[elasticsearch.clients]] += Elasticsearch Clients + +This chapter illustrates configuration and usage of supported Elasticsearch client implementations. + +Spring data Elasticsearch operates upon an Elasticsearch client that is connected to a single Elasticsearch node or a cluster. + +WARNING: The well known `TransportClient` is deprecated as of Elasticsearch 7.0.0 and is expected to be removed in Elasticsearch 8.0. + +[[elasticsearch.clients.rest]] +== High Level REST Client + +The Java High Level REST Client provides a straight forward replacement for the `TransportClient` as it accepts and returns +the very same request/response objects and therefore depends on the Elasticsearch core project. +Asynchronous calls are operated upon a client managed thread pool and require a callback to be notified when the request is done. + +.High Level REST Client +==== +[source,java] +---- +static class Config { + + @Bean + RestHighLevelClient client() { + + ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1> + .connectedTo("localhost:9200", "localhost:9201") + .build(); + + return RestClients.create(clientConfiguration).rest(); <2> + } +} + +// ... + +IndexRequest request = new IndexRequest("spring-data", "elasticsearch", randomID()) + .source(singletonMap("feature", "high-level-rest-client")) + .setRefreshPolicy(IMMEDIATE); + +IndexResponse response = client.index(request); +---- +<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL. +<2> Next to the `rest()` client it is also possible to obtain the `lowLevelRest()` client. +==== + +[[elasticsearch.clients.reactive]] +== Reactive Client + +The `ReactiveElasticsearchClient` is a non official driver based on `WebClient`. +It uses the request/response objects provided by the Elasticsearch core project. +Calls are directly operated on the reactive stack, **not** wrapping async (thread pool bound) responses into reactive types. + +.Reactive REST Client +==== +[source,java] +---- +static class Config { + + @Bean + ReactiveElasticsearchClient client() { + + ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1> + .connectedTo("localhost:9200", "localhost:9291") + .build(); + + return ReactiveRestClients.create(clientConfiguration); + } +} + +// ... + +Mono response = client.index(request -> + + request.index("spring-data") + .type("elasticsearch") + .id(randomID()) + .source(singletonMap("feature", "reactive-client")) + .setRefreshPolicy(IMMEDIATE); +); +---- +<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enbale SSL. +==== + diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index ef67033a6..b2f0cb2b9 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -64,7 +64,7 @@ import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.NoReachableHostException; -import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode; +import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification; import org.springframework.data.elasticsearch.client.util.RequestConverters; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -153,9 +153,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch Optional sslContext = clientConfiguration.getSslContext(); - sslContext.ifPresent(it -> { - sslConfig.sslContext(new JdkSslContext(it, true, ClientAuth.NONE)); - }); + sslContext.ifPresent(it -> sslConfig.sslContext(new JdkSslContext(it, true, ClientAuth.NONE))); })); provider = WebClientProvider.create("https", connector); } else { @@ -273,13 +271,13 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch @Override public Mono execute(ReactiveElasticsearchClientCallback callback) { - return this.hostProvider.getActive(VerificationMode.LAZY) // + return this.hostProvider.getActive(Verification.LAZY) // .flatMap(callback::doWithClient) // .onErrorResume(throwable -> { if (throwable instanceof ConnectException) { - return hostProvider.getActive(VerificationMode.ACTIVE) // + return hostProvider.getActive(Verification.ACTIVE) // .flatMap(callback::doWithClient); } @@ -357,9 +355,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return response.body(BodyExtractors.toMono(byte[].class)) // .map(it -> new String(it, StandardCharsets.UTF_8)) // - .flatMap(content -> { - return doDecode(response, responseType, content); - }); + .flatMap(content -> doDecode(response, responseType, content)); } private static Mono doDecode(ClientResponse response, Class responseType, String content) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java index 82bb08d70..d20b4dfa9 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java @@ -31,6 +31,7 @@ import org.springframework.web.reactive.function.client.WebClient.Builder; * Default {@link WebClientProvider} that uses cached {@link WebClient} instances per {@code hostAndPort}. * * @author Mark Paluch + * @author Christoph Strobl * @since 4.0 */ class DefaultWebClientProvider implements WebClientProvider { @@ -42,19 +43,32 @@ class DefaultWebClientProvider implements WebClientProvider { private final Consumer errorListener; private final HttpHeaders headers; + /** + * Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}. + * + * @param scheme must not be {@literal null}. + * @param connector can be {@literal null}. + */ DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector) { this(scheme, connector, e -> {}, HttpHeaders.EMPTY); } + /** + * Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}. + * + * @param scheme must not be {@literal null}. + * @param connector can be {@literal null}. + * @param errorListener must not be {@literal null}. + * @param headers must not be {@literal null}. + */ private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector, Consumer errorListener, HttpHeaders headers) { - this(new ConcurrentHashMap<>(), scheme, connector, errorListener, headers); - } - private DefaultWebClientProvider(Map cachedClients, String scheme, - @Nullable ClientHttpConnector connector, Consumer errorListener, HttpHeaders headers) { + Assert.notNull(scheme, "Scheme must not be null! A common scheme would be 'http'."); + Assert.notNull(errorListener, "ErrorListener must not be null! You may want use a no-op one 'e -> {}' instead."); + Assert.notNull(headers, "headers must not be null! Think about using 'HttpHeaders.EMPTY' as an alternative."); - this.cachedClients = cachedClients; + this.cachedClients = new ConcurrentHashMap<>(); this.scheme = scheme; this.connector = connector; this.errorListener = errorListener; @@ -70,18 +84,7 @@ class DefaultWebClientProvider implements WebClientProvider { Assert.notNull(endpoint, "Endpoint must not be empty!"); - return this.cachedClients.computeIfAbsent(endpoint, key -> { - - Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders())); - - if (connector != null) { - builder.clientConnector(connector); - } - - String baseUrl = String.format("%s://%s:%d", this.scheme, key.getHostString(), key.getPort()); - return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)) - .build(); - }); + return this.cachedClients.computeIfAbsent(endpoint, this::createWebClientForSocketAddress); } /* @@ -100,7 +103,7 @@ class DefaultWebClientProvider implements WebClientProvider { @Override public WebClientProvider withDefaultHeaders(HttpHeaders headers) { - Assert.notNull(headers, "HttpHeaders must not be null"); + Assert.notNull(headers, "HttpHeaders must not be null."); HttpHeaders merged = new HttpHeaders(); merged.addAll(this.headers); @@ -125,9 +128,21 @@ class DefaultWebClientProvider implements WebClientProvider { @Override public WebClientProvider withErrorListener(Consumer errorListener) { - Assert.notNull(errorListener, "Error listener must not be null"); + Assert.notNull(errorListener, "Error listener must not be null."); Consumer listener = this.errorListener.andThen(errorListener); return new DefaultWebClientProvider(this.scheme, this.connector, listener, this.headers); } + + protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) { + + Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders())); + + if (connector != null) { + builder = builder.clientConnector(connector); + } + + String baseUrl = String.format("%s://%s:%d", this.scheme, socketAddress.getHostString(), socketAddress.getPort()); + return builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build(); + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java index 8e194a9f1..54ae09680 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java @@ -36,65 +36,11 @@ import org.springframework.web.reactive.function.client.WebClient; */ public interface HostProvider { - /** - * Lookup an active host in {@link VerificationMode#LAZY lazy} mode utilizing cached {@link ElasticsearchHost}. - * - * @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} if none found. - */ - default Mono lookupActiveHost() { - return lookupActiveHost(VerificationMode.LAZY); - } - - /** - * Lookup an active host in using the given {@link VerificationMode}. - * - * @param verificationMode - * @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} - * ({@link NoReachableHostException}) if none found. - */ - Mono lookupActiveHost(VerificationMode verificationMode); - - /** - * Get the {@link WebClient} connecting to an active host utilizing cached {@link ElasticsearchHost}. - * - * @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none - * found. - */ - default Mono getActive() { - return getActive(VerificationMode.LAZY); - } - - /** - * Get the {@link WebClient} connecting to an active host. - * - * @param verificationMode must not be {@literal null}. - * @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none - * found. - */ - default Mono getActive(VerificationMode verificationMode) { - return lookupActiveHost(verificationMode).map(this::createWebClient); - } - - /** - * Creates a {@link WebClient} for {@link InetSocketAddress endpoint}. - * - * @param baseUrl - * @return - */ - WebClient createWebClient(InetSocketAddress endpoint); - - /** - * Obtain information about known cluster nodes. - * - * @return the {@link Mono} emitting {@link ClusterInformation} when available. - */ - Mono clusterInfo(); - /** * Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts. * * @param clientProvider must not be {@literal null} . - * @param hosts must not be {@literal null} nor empty. + * @param endpoints must not be {@literal null} nor empty. * @return new instance of {@link HostProvider}. */ static HostProvider provider(WebClientProvider clientProvider, InetSocketAddress... endpoints) { @@ -110,10 +56,66 @@ public interface HostProvider { } /** + * Lookup an active host in {@link Verification#LAZY lazy} mode utilizing cached {@link ElasticsearchHost}. + * + * @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} if none found. + */ + default Mono lookupActiveHost() { + return lookupActiveHost(Verification.LAZY); + } + + /** + * Lookup an active host in using the given {@link Verification}. + * + * @param verification + * @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} + * ({@link NoReachableHostException}) if none found. + */ + Mono lookupActiveHost(Verification verification); + + /** + * Get the {@link WebClient} connecting to an active host utilizing cached {@link ElasticsearchHost}. + * + * @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none + * found. + */ + default Mono getActive() { + return getActive(Verification.LAZY); + } + + /** + * Get the {@link WebClient} connecting to an active host. + * + * @param verification must not be {@literal null}. + * @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none + * found. + */ + default Mono getActive(Verification verification) { + return lookupActiveHost(verification).map(this::createWebClient); + } + + /** + * Creates a {@link WebClient} for {@link InetSocketAddress endpoint}. + * + * @param endpoint must not be {@literal null}. + * @return a {@link WebClient} using the the given endpoint as {@literal base url}. + */ + WebClient createWebClient(InetSocketAddress endpoint); + + /** + * Obtain information about known cluster nodes. + * + * @return the {@link Mono} emitting {@link ClusterInformation} when available. + */ + Mono clusterInfo(); + + /** + * {@link Verification} allows to influence the lookup strategy for active hosts. + * * @author Christoph Strobl * @since 4.0 */ - enum VerificationMode { + enum Verification { /** * Actively check for cluster node health. @@ -127,9 +129,9 @@ public interface HostProvider { } /** - * Value object accumulating information about cluster an Elasticsearch cluster. + * Value object accumulating information about an Elasticsearch cluster. * - * @author Christoph Strobll + * @author Christoph Strobl * @since 4.0. */ class ClusterInformation { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java index a5e56a821..86272b4dc 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java @@ -74,18 +74,14 @@ class MultiNodeHostProvider implements HostProvider { return this.clientProvider.get(endpoint); } - Collection getCachedHostState() { - return hosts.values(); - } - /* * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#lookupActiveHost(org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#lookupActiveHost(org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification) */ @Override - public Mono lookupActiveHost(VerificationMode verificationMode) { + public Mono lookupActiveHost(Verification verification) { - if (VerificationMode.LAZY.equals(verificationMode)) { + if (Verification.LAZY.equals(verification)) { for (ElasticsearchHost entry : hosts()) { if (entry.isOnline()) { return Mono.just(entry.getEndpoint()); @@ -99,6 +95,10 @@ class MultiNodeHostProvider implements HostProvider { .switchIfEmpty(Mono.error(() -> new NoReachableHostException(new LinkedHashSet<>(getCachedHostState())))); } + Collection getCachedHostState() { + return hosts.values(); + } + private Mono findActiveHostInKnownActives() { return findActiveForSate(State.ONLINE); } @@ -140,9 +140,7 @@ class MultiNodeHostProvider implements HostProvider { return Mono.just(host).zipWith(exchange); }) // - .onErrorContinue((throwable, o) -> { - clientProvider.getErrorListener().accept(throwable); - }); + .onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable)); } private List hosts() { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java index b4073249b..e3e1bb76b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RawActionResponse.java @@ -26,7 +26,6 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ClientHttpResponse; -import org.springframework.http.codec.HttpMessageReader; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyExtractor; import org.springframework.web.reactive.function.client.ClientResponse; @@ -34,6 +33,9 @@ import org.springframework.web.reactive.function.client.ExchangeStrategies; /** * Extension to {@link ActionResponse} that also implements {@link ClientResponse}. + * + * @author Christoph Strobl + * @since 4.0 */ class RawActionResponse extends ActionResponse implements ClientResponse { @@ -43,30 +45,15 @@ class RawActionResponse extends ActionResponse implements ClientResponse { this.delegate = delegate; } - public static RawActionResponse create(ClientResponse response) { + static RawActionResponse create(ClientResponse response) { return new RawActionResponse(response); } - public static Builder builder(ClientResponse other) { - return ClientResponse.from(other); - } - - public static Builder builder(HttpStatus statusCode) { - return ClientResponse.create(statusCode); - } - - public static Builder builder(HttpStatus statusCode, ExchangeStrategies strategies) { - return ClientResponse.create(statusCode, strategies); - } - - public static Builder builder(HttpStatus statusCode, List> messageReaders) { - return ClientResponse.create(statusCode, messageReaders); - } - /* * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#statusCode() */ + @Override public HttpStatus statusCode() { return delegate.statusCode(); } @@ -75,6 +62,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#rawStatusCode() */ + @Override public int rawStatusCode() { return delegate.rawStatusCode(); } @@ -83,6 +71,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#headers() */ + @Override public Headers headers() { return delegate.headers(); } @@ -91,6 +80,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#cookies() */ + @Override public MultiValueMap cookies() { return delegate.cookies(); } @@ -99,6 +89,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#strategies() */ + @Override public ExchangeStrategies strategies() { return delegate.strategies(); } @@ -107,6 +98,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#body(org.springframework.web.reactive.function.BodyExtractor) */ + @Override public T body(BodyExtractor extractor) { return delegate.body(extractor); } @@ -115,6 +107,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToMono(java.lang.Class) */ + @Override public Mono bodyToMono(Class elementClass) { return delegate.bodyToMono(elementClass); } @@ -123,6 +116,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToMono(org.springframework.core.ParameterizedTypeReference) */ + @Override public Mono bodyToMono(ParameterizedTypeReference typeReference) { return delegate.bodyToMono(typeReference); } @@ -131,6 +125,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToFlux(java.lang.Class) */ + @Override public Flux bodyToFlux(Class elementClass) { return delegate.bodyToFlux(elementClass); } @@ -139,6 +134,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#bodyToFlux(org.springframework.core.ParameterizedTypeReference) */ + @Override public Flux bodyToFlux(ParameterizedTypeReference typeReference) { return delegate.bodyToFlux(typeReference); } @@ -147,6 +143,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#toEntity(java.lang.Class) */ + @Override public Mono> toEntity(Class bodyType) { return delegate.toEntity(bodyType); } @@ -155,6 +152,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#toEntity(org.springframework.core.ParameterizedTypeReference) */ + @Override public Mono> toEntity(ParameterizedTypeReference typeReference) { return delegate.toEntity(typeReference); } @@ -163,6 +161,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#toEntityList(java.lang.Class) */ + @Override public Mono>> toEntityList(Class elementType) { return delegate.toEntityList(elementType); } @@ -171,6 +170,7 @@ class RawActionResponse extends ActionResponse implements ClientResponse { * (non-Javadoc) * @see org.springframework.web.reactive.function.client.ClientResponse#toEntityList(org.springframework.core.ParameterizedTypeReference) */ + @Override public Mono>> toEntityList(ParameterizedTypeReference typeReference) { return delegate.toEntityList(typeReference); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index e398c384f..ba9240580 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -86,18 +86,6 @@ public interface ReactiveElasticsearchClient { */ Mono info(HttpHeaders headers); - /** - * Execute the given {@link GetRequest} against the {@literal get} API to retrieve a document by id. - * - * @param getRequest must not be {@literal null}. - * @see Get API on - * elastic.co - * @return the {@link Mono} emitting the {@link GetResult result}. - */ - default Mono get(GetRequest getRequest) { - return get(HttpHeaders.EMPTY, getRequest); - } - /** * Execute a {@link GetRequest} against the {@literal get} API to retrieve a document by id. * @@ -113,6 +101,18 @@ public interface ReactiveElasticsearchClient { return get(request); } + /** + * Execute the given {@link GetRequest} against the {@literal get} API to retrieve a document by id. + * + * @param getRequest must not be {@literal null}. + * @see Get API on + * elastic.co + * @return the {@link Mono} emitting the {@link GetResult result}. + */ + default Mono get(GetRequest getRequest) { + return get(HttpHeaders.EMPTY, getRequest); + } + /** * Execute the given {@link GetRequest} against the {@literal get} API to retrieve a document by id. * @@ -124,19 +124,6 @@ public interface ReactiveElasticsearchClient { */ Mono get(HttpHeaders headers, GetRequest getRequest); - /** - * Execute the given {@link MultiGetRequest} against the {@literal multi-get} API to retrieve multiple documents by - * id. - * - * @param multiGetRequest must not be {@literal null}. - * @see Multi Get API on - * elastic.co - * @return the {@link Flux} emitting the {@link GetResult result}. - */ - default Flux multiGet(MultiGetRequest multiGetRequest) { - return multiGet(HttpHeaders.EMPTY, multiGetRequest); - } - /** * Execute a {@link MultiGetRequest} against the {@literal multi-get} API to retrieve multiple documents by id. * @@ -152,6 +139,19 @@ public interface ReactiveElasticsearchClient { return multiGet(request); } + /** + * Execute the given {@link MultiGetRequest} against the {@literal multi-get} API to retrieve multiple documents by + * id. + * + * @param multiGetRequest must not be {@literal null}. + * @see Multi Get API on + * elastic.co + * @return the {@link Flux} emitting the {@link GetResult result}. + */ + default Flux multiGet(MultiGetRequest multiGetRequest) { + return multiGet(HttpHeaders.EMPTY, multiGetRequest); + } + /** * Execute the given {@link MultiGetRequest} against the {@literal multi-get} API to retrieve multiple documents by * id. @@ -164,16 +164,6 @@ public interface ReactiveElasticsearchClient { */ Flux multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest); - /** - * Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise. - * - * @param getRequest must not be {@literal null}. - * @return the {@link Mono} emitting {@literal true} if it exists, {@literal false} otherwise. - */ - default Mono exists(GetRequest getRequest) { - return exists(HttpHeaders.EMPTY, getRequest); - } - /** * Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise. * @@ -187,6 +177,16 @@ public interface ReactiveElasticsearchClient { return exists(request); } + /** + * Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise. + * + * @param getRequest must not be {@literal null}. + * @return the {@link Mono} emitting {@literal true} if it exists, {@literal false} otherwise. + */ + default Mono exists(GetRequest getRequest) { + return exists(HttpHeaders.EMPTY, getRequest); + } + /** * Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise. * @@ -196,18 +196,6 @@ public interface ReactiveElasticsearchClient { */ Mono exists(HttpHeaders headers, GetRequest getRequest); - /** - * Execute the given {@link IndexRequest} against the {@literal index} API to index a document. - * - * @param indexRequest must not be {@literal null}. - * @see Index API on - * elastic.co - * @return the {@link Mono} emitting the {@link IndexResponse}. - */ - default Mono index(IndexRequest indexRequest) { - return index(HttpHeaders.EMPTY, indexRequest); - } - /** * Execute an {@link IndexRequest} against the {@literal index} API to index a document. * @@ -223,6 +211,18 @@ public interface ReactiveElasticsearchClient { return index(request); } + /** + * Execute the given {@link IndexRequest} against the {@literal index} API to index a document. + * + * @param indexRequest must not be {@literal null}. + * @see Index API on + * elastic.co + * @return the {@link Mono} emitting the {@link IndexResponse}. + */ + default Mono index(IndexRequest indexRequest) { + return index(HttpHeaders.EMPTY, indexRequest); + } + /** * Execute the given {@link IndexRequest} against the {@literal index} API to index a document. * @@ -234,18 +234,6 @@ public interface ReactiveElasticsearchClient { */ Mono index(HttpHeaders headers, IndexRequest indexRequest); - /** - * Execute the given {@link UpdateRequest} against the {@literal update} API to alter a document. - * - * @param updateRequest must not be {@literal null}. - * @see Update API on - * elastic.co - * @return the {@link Mono} emitting the {@link UpdateResponse}. - */ - default Mono update(UpdateRequest updateRequest) { - return update(HttpHeaders.EMPTY, updateRequest); - } - /** * Execute an {@link UpdateRequest} against the {@literal update} API to alter a document. * @@ -261,6 +249,18 @@ public interface ReactiveElasticsearchClient { return update(request); } + /** + * Execute the given {@link UpdateRequest} against the {@literal update} API to alter a document. + * + * @param updateRequest must not be {@literal null}. + * @see Update API on + * elastic.co + * @return the {@link Mono} emitting the {@link UpdateResponse}. + */ + default Mono update(UpdateRequest updateRequest) { + return update(HttpHeaders.EMPTY, updateRequest); + } + /** * Execute the given {@link UpdateRequest} against the {@literal update} API to alter a document. * @@ -272,18 +272,6 @@ public interface ReactiveElasticsearchClient { */ Mono update(HttpHeaders headers, UpdateRequest updateRequest); - /** - * Execute the given {@link DeleteRequest} against the {@literal delete} API to remove a document. - * - * @param deleteRequest must not be {@literal null}. - * @see Delete API on - * elastic.co - * @return the {@link Mono} emitting the {@link DeleteResponse}. - */ - default Mono delete(DeleteRequest deleteRequest) { - return delete(HttpHeaders.EMPTY, deleteRequest); - } - /** * Execute a {@link DeleteRequest} against the {@literal delete} API to remove a document. * @@ -299,6 +287,18 @@ public interface ReactiveElasticsearchClient { return delete(request); } + /** + * Execute the given {@link DeleteRequest} against the {@literal delete} API to remove a document. + * + * @param deleteRequest must not be {@literal null}. + * @see Delete API on + * elastic.co + * @return the {@link Mono} emitting the {@link DeleteResponse}. + */ + default Mono delete(DeleteRequest deleteRequest) { + return delete(HttpHeaders.EMPTY, deleteRequest); + } + /** * Execute the given {@link DeleteRequest} against the {@literal delete} API to remove a document. * @@ -310,18 +310,6 @@ public interface ReactiveElasticsearchClient { */ Mono delete(HttpHeaders headers, DeleteRequest deleteRequest); - /** - * Execute the given {@link SearchRequest} against the {@literal search} API. - * - * @param searchRequest must not be {@literal null}. - * @see Search API on - * elastic.co - * @return the {@link Flux} emitting {@link SearchHit hits} one by one. - */ - default Flux search(SearchRequest searchRequest) { - return search(HttpHeaders.EMPTY, searchRequest); - } - /** * Execute a {@link SearchRequest} against the {@literal search} API. * @@ -337,6 +325,18 @@ public interface ReactiveElasticsearchClient { return search(request); } + /** + * Execute the given {@link SearchRequest} against the {@literal search} API. + * + * @param searchRequest must not be {@literal null}. + * @see Search API on + * elastic.co + * @return the {@link Flux} emitting {@link SearchHit hits} one by one. + */ + default Flux search(SearchRequest searchRequest) { + return search(HttpHeaders.EMPTY, searchRequest); + } + /** * Execute the given {@link SearchRequest} against the {@literal search} API. * @@ -364,7 +364,7 @@ public interface ReactiveElasticsearchClient { * NOTE the actual implementation might choose to actively check the current cluster state by pinging * known nodes. * - * @return + * @return the actual {@link Status} information. */ Mono status(); @@ -387,7 +387,7 @@ public interface ReactiveElasticsearchClient { interface Status { /** - * Get the list of known hosts and their getCachedHostState. + * Get the collection of known hosts. * * @return never {@literal null}. */ diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveRestClients.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveRestClients.java index f1d7baeac..ab6f44513 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveRestClients.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveRestClients.java @@ -16,7 +16,6 @@ package org.springframework.data.elasticsearch.client.reactive; import org.springframework.data.elasticsearch.client.ClientConfiguration; -import org.springframework.data.elasticsearch.client.RestClients.ElasticsearchRestClient; import org.springframework.util.Assert; /** @@ -34,7 +33,7 @@ public final class ReactiveRestClients { /** * Start here to create a new client tailored to your needs. * - * @return new instance of {@link ElasticsearchRestClient}. + * @return new instance of {@link ReactiveElasticsearchClient}. */ public static ReactiveElasticsearchClient create(ClientConfiguration clientConfiguration) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java index 4726a57e6..064bf84f0 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java @@ -82,12 +82,12 @@ class SingleNodeHostProvider implements HostProvider { /* * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#lookupActiveHost(org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode) + * @see org.springframework.data.elasticsearch.client.reactive.HostProvider#lookupActiveHost(org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification) */ @Override - public Mono lookupActiveHost(VerificationMode verificationMode) { + public Mono lookupActiveHost(Verification verification) { - if (VerificationMode.LAZY.equals(verificationMode) && state.isOnline()) { + if (Verification.LAZY.equals(verification) && state.isOnline()) { return Mono.just(endpoint); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/WebClientProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/WebClientProvider.java index a2633cbb7..375c6c6c6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/WebClientProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/WebClientProvider.java @@ -20,6 +20,7 @@ import java.util.function.Consumer; import org.springframework.http.HttpHeaders; import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.web.reactive.function.client.WebClient; @@ -37,6 +38,15 @@ import org.springframework.web.reactive.function.client.WebClient; */ public interface WebClientProvider { + /** + * Creates a new {@link WebClientProvider} using the {@code http} scheme and a default {@link ClientHttpConnector}. + * + * @return the resulting {@link WebClientProvider}. + */ + static WebClientProvider http() { + return create("http"); + } + /** * Creates a new {@link WebClientProvider} using the given {@code scheme} and a default {@link ClientHttpConnector}. * @@ -54,10 +64,10 @@ public interface WebClientProvider { * Creates a new {@link WebClientProvider} given {@code scheme} and {@link ClientHttpConnector}. * * @param scheme protocol scheme such as {@literal http} or {@literal https}. - * @param connector the HTTP connector to use. + * @param connector the HTTP connector to use. Can be {@literal null}. * @return the resulting {@link WebClientProvider}. */ - static WebClientProvider create(String scheme, ClientHttpConnector connector) { + static WebClientProvider create(String scheme, @Nullable ClientHttpConnector connector) { Assert.hasText(scheme, "Protocol scheme must not be empty"); @@ -79,6 +89,13 @@ public interface WebClientProvider { */ HttpHeaders getDefaultHeaders(); + /** + * Obtain the {@link Consumer error listener} to be used; + * + * @return never {@literal null}. + */ + Consumer getErrorListener(); + /** * Create a new instance of {@link WebClientProvider} applying the given headers by default. * @@ -87,13 +104,6 @@ public interface WebClientProvider { */ WebClientProvider withDefaultHeaders(HttpHeaders headers); - /** - * Obtain the {@link Consumer error listener} to be used; - * - * @return never {@literal null}. - */ - Consumer getErrorListener(); - /** * Create a new instance of {@link WebClientProvider} calling the given {@link Consumer} on error. * diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/package-info.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/package-info.java new file mode 100644 index 000000000..2423de032 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/package-info.java @@ -0,0 +1,6 @@ +/** + * Everything required for a Reactive Elasticsearch client. + */ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package org.springframework.data.elasticsearch.client.reactive; diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index c2df9e927..ccba7e471 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -481,7 +481,7 @@ public class RequestConverters { .withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) .withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond()) .withIndicesOptions(updateByQueryRequest.indicesOptions()); - if (updateByQueryRequest.isAbortOnVersionConflict() == false) { + if (!updateByQueryRequest.isAbortOnVersionConflict()) { params.putParam("conflicts", "proceed"); } if (updateByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { @@ -505,7 +505,7 @@ public class RequestConverters { .withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards()) .withRequestsPerSecond(deleteByQueryRequest.getRequestsPerSecond()) .withIndicesOptions(deleteByQueryRequest.indicesOptions()); - if (deleteByQueryRequest.isAbortOnVersionConflict() == false) { + if (!deleteByQueryRequest.isAbortOnVersionConflict()) { params.putParam("conflicts", "proceed"); } if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { @@ -689,7 +689,7 @@ public class RequestConverters { Params withFetchSourceContext(FetchSourceContext fetchSourceContext) { if (fetchSourceContext != null) { - if (fetchSourceContext.fetchSource() == false) { + if (!fetchSourceContext.fetchSource()) { putParam("_source", Boolean.FALSE.toString()); } if (fetchSourceContext.includes() != null && fetchSourceContext.includes().length > 0) { @@ -722,7 +722,7 @@ public class RequestConverters { } Params withRealtime(boolean realtime) { - if (realtime == false) { + if (!realtime) { return putParam("realtime", Boolean.FALSE.toString()); } return this; @@ -803,7 +803,7 @@ public class RequestConverters { withIgnoreUnavailable(indicesOptions.ignoreUnavailable()); putParam("allow_no_indices", Boolean.toString(indicesOptions.allowNoIndices())); String expandWildcards; - if (indicesOptions.expandWildcardsOpen() == false && indicesOptions.expandWildcardsClosed() == false) { + if (!indicesOptions.expandWildcardsOpen() && !indicesOptions.expandWildcardsClosed()) { expandWildcards = "none"; } else { StringJoiner joiner = new StringJoiner(","); @@ -827,14 +827,14 @@ public class RequestConverters { Params withHuman(boolean human) { if (human) { - putParam("human", Boolean.toString(human)); + putParam("human", "true"); } return this; } Params withLocal(boolean local) { if (local) { - putParam("local", Boolean.toString(local)); + putParam("local", "true"); } return this; } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index d7b9ba234..9833f7ec1 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core; import static org.elasticsearch.index.VersionType.*; +import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -222,7 +223,7 @@ public class ReactiveElasticsearchTemplate { protected Mono doIndex(Object value, ElasticsearchPersistentEntity entity, @Nullable String index, @Nullable String type) { - PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(value); + PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(value); Object id = propertyAccessor.getProperty(entity.getIdProperty()); String indexToUse = indexName(index, entity); @@ -298,7 +299,10 @@ public class ReactiveElasticsearchTemplate { List mappedSort = new ArrayList<>(); for (Sort.Order order : query.getSort()) { - FieldSortBuilder sort = SortBuilders.fieldSort(entity.getPersistentProperty(order.getProperty()).getFieldName()) + ElasticsearchPersistentProperty property = entity.getPersistentProperty(order.getProperty()); + String fieldName = property != null ? property.getFieldName() : order.getProperty(); + + FieldSortBuilder sort = SortBuilders.fieldSort(fieldName) .order(order.getDirection().isDescending() ? SortOrder.DESC : SortOrder.ASC); if (order.getNullHandling() == Sort.NullHandling.NULLS_FIRST) { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java new file mode 100644 index 000000000..86bb568e0 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProviderUnitTests.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.client.reactive; + +import static org.assertj.core.api.Assertions.*; + +import java.net.InetSocketAddress; + +import org.junit.Test; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * @author Christoph Strobl + */ +public class DefaultWebClientProviderUnitTests { + + @Test // DATAES-488 + public void shouldCacheClients() { + + DefaultWebClientProvider provider = new DefaultWebClientProvider("http", null); + + WebClient client1 = provider.get(InetSocketAddress.createUnresolved("localhost", 9200)); + WebClient shouldBeCachedInstanceOfClient1 = provider.get(InetSocketAddress.createUnresolved("localhost", 9200)); + + WebClient notClient1ButAnotherInstance = provider.get(InetSocketAddress.createUnresolved("127.0.0.1", 9200)); + + assertThat(shouldBeCachedInstanceOfClient1).isSameAs(client1); + assertThat(notClient1ButAnotherInstance).isNotSameAs(client1); + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java index 55444035f..6662b5aef 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProviderUnitTests.java @@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; +import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -25,7 +26,6 @@ import org.junit.Before; import org.junit.Test; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost.State; -import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode; import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockDelegatingElasticsearchHostProvider; import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive; import org.springframework.web.reactive.function.client.ClientResponse; @@ -92,7 +92,7 @@ public class MultiNodeHostProviderUnitTests { provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete(); - provider.getActive(VerificationMode.LAZY).as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete(); + provider.getActive(Verification.LAZY).as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete(); verify(mock.client(":9201")).head(); } @@ -106,7 +106,7 @@ public class MultiNodeHostProviderUnitTests { provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete(); - provider.getActive(VerificationMode.ACTIVE).as(StepVerifier::create).expectNext(mock.client(HOST_2)) + provider.getActive(Verification.ACTIVE).as(StepVerifier::create).expectNext(mock.client(HOST_2)) .verifyComplete(); verify(mock.client(HOST_2), times(2)).head(); diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java index 455936259..82a6d6f0f 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java @@ -122,21 +122,21 @@ public class ReactiveMockClientTestsUtils { return delegate.lookupActiveHost(); } - public Mono lookupActiveHost(VerificationMode verificationMode) { + public Mono lookupActiveHost(Verification verification) { if (StringUtils.hasText(activeDefaultHost)) { return Mono.just(getInetSocketAddress(activeDefaultHost)); } - return delegate.lookupActiveHost(verificationMode); + return delegate.lookupActiveHost(verification); } public Mono getActive() { return delegate.getActive(); } - public Mono getActive(VerificationMode verificationMode) { - return delegate.getActive(verificationMode); + public Mono getActive(Verification verification) { + return delegate.getActive(verification); } public WebClient createWebClient(InetSocketAddress endpoint) {