diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 2d8f2cada..adcd29c13 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -145,7 +145,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe */ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices { - private final HostProvider hostProvider; + private final HostProvider hostProvider; private final RequestCreator requestCreator; private Supplier headersSupplier = () -> HttpHeaders.EMPTY; @@ -155,7 +155,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch * * @param hostProvider must not be {@literal null}. */ - public DefaultReactiveElasticsearchClient(HostProvider hostProvider) { + public DefaultReactiveElasticsearchClient(HostProvider hostProvider) { this(hostProvider, new DefaultRequestCreator()); } @@ -166,7 +166,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch * @param hostProvider must not be {@literal null}. * @param requestCreator must not be {@literal null}. */ - public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) { + public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) { Assert.notNull(hostProvider, "HostProvider must not be null"); Assert.notNull(requestCreator, "RequestCreator must not be null"); @@ -535,8 +535,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .flatMap(callback::doWithClient) // .onErrorResume(throwable -> { - if (throwable instanceof ConnectException) { - + if (isCausedByConnectionException(throwable)) { return hostProvider.getActive(Verification.ACTIVE) // .flatMap(callback::doWithClient); } @@ -545,6 +544,27 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch }); } + /** + * checks if the given throwable is a {@link ConnectException} or has one in it's cause chain + * + * @param throwable the throwable to check + * @return true if throwable is caused by a {@link ConnectException} + */ + private boolean isCausedByConnectionException(Throwable throwable) { + + Throwable t = throwable; + do { + + if (t instanceof ConnectException) { + return true; + } + + t = t.getCause(); + } while (t != null); + + return false; + } + @Override public Mono status() { @@ -823,10 +843,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); return response.body(BodyExtractors.toMono(byte[].class)) // - .switchIfEmpty(Mono - .error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.", - request.getMethod(), request.getEndpoint(), statusCode), status)) - ) + .switchIfEmpty(Mono.error( + new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.", + request.getMethod(), request.getEndpoint(), statusCode), status))) .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // .flatMap(content -> contentOrError(content, mediaType, status)) .flatMap(unused -> Mono diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java index 2474315dc..ab4d9e096 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/HostProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,9 +34,10 @@ import org.springframework.web.reactive.function.client.WebClient; * * @author Christoph Strobl * @author Mark Paluch + * @author Peter-Josef Meisch * @since 3.2 */ -public interface HostProvider { +public interface HostProvider> { /** * Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts. @@ -46,7 +47,7 @@ public interface HostProvider { * @param endpoints must not be {@literal null} nor empty. * @return new instance of {@link HostProvider}. */ - static HostProvider provider(WebClientProvider clientProvider, Supplier headersSupplier, + static HostProvider provider(WebClientProvider clientProvider, Supplier headersSupplier, InetSocketAddress... endpoints) { Assert.notNull(clientProvider, "WebClientProvider must not be null"); diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java index 86e65ffd9..ec8afcf1d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/MultiNodeHostProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -29,6 +30,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost.State; import org.springframework.data.elasticsearch.client.NoReachableHostException; @@ -42,15 +45,19 @@ import org.springframework.web.reactive.function.client.WebClient; * * @author Christoph Strobl * @author Mark Paluch + * @author Peter-Josef Meisch * @since 3.2 */ -class MultiNodeHostProvider implements HostProvider { +class MultiNodeHostProvider implements HostProvider { + + private final static Logger LOG = LoggerFactory.getLogger(MultiNodeHostProvider.class); private final WebClientProvider clientProvider; private final Supplier headersSupplier; private final Map hosts; - MultiNodeHostProvider(WebClientProvider clientProvider, Supplier headersSupplier, InetSocketAddress... endpoints) { + MultiNodeHostProvider(WebClientProvider clientProvider, Supplier headersSupplier, + InetSocketAddress... endpoints) { this.clientProvider = clientProvider; this.headersSupplier = headersSupplier; @@ -58,6 +65,8 @@ class MultiNodeHostProvider implements HostProvider { for (InetSocketAddress endpoint : endpoints) { this.hosts.put(endpoint, new ElasticsearchHost(endpoint, State.UNKNOWN)); } + + LOG.debug("initialized with " + hosts); } /* @@ -66,7 +75,7 @@ class MultiNodeHostProvider implements HostProvider { */ @Override public Mono clusterInfo() { - return nodes(null).map(this::updateNodeState).buffer(hosts.size()) + return checkNodes(null).map(this::updateNodeState).buffer(hosts.size()) .then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values())))); } @@ -86,14 +95,19 @@ class MultiNodeHostProvider implements HostProvider { @Override public Mono lookupActiveHost(Verification verification) { + LOG.trace("lookupActiveHost " + verification + " from " + hosts()); + if (Verification.LAZY.equals(verification)) { for (ElasticsearchHost entry : hosts()) { if (entry.isOnline()) { + LOG.trace("lookupActiveHost returning " + entry); return Mono.just(entry.getEndpoint()); } } + LOG.trace("no online host found with LAZY"); } + LOG.trace("searching for active host"); return findActiveHostInKnownActives() // .switchIfEmpty(findActiveHostInUnresolved()) // .switchIfEmpty(findActiveHostInDead()) // @@ -105,20 +119,30 @@ class MultiNodeHostProvider implements HostProvider { } private Mono findActiveHostInKnownActives() { - return findActiveForSate(State.ONLINE); + return findActiveForState(State.ONLINE); } private Mono findActiveHostInUnresolved() { - return findActiveForSate(State.UNKNOWN); + return findActiveForState(State.UNKNOWN); } private Mono findActiveHostInDead() { - return findActiveForSate(State.OFFLINE); + return findActiveForState(State.OFFLINE); } - private Mono findActiveForSate(State state) { - return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline) - .map(ElasticsearchHost::getEndpoint).next(); + private Mono findActiveForState(State state) { + + LOG.trace("findActiveForState state " + state + ", current hosts: " + hosts); + + return checkNodes(state) // + .map(this::updateNodeState) // + .filter(ElasticsearchHost::isOnline) // + .map(elasticsearchHost -> { + LOG.trace("findActiveForState returning host " + elasticsearchHost); + return elasticsearchHost; + }).map(ElasticsearchHost::getEndpoint) // + .takeLast(1) // + .next(); } private ElasticsearchHost updateNodeState(Tuple2 tuple2) { @@ -129,17 +153,23 @@ class MultiNodeHostProvider implements HostProvider { return elasticsearchHost; } - private Flux> nodes(@Nullable State state) { + private Flux> checkNodes(@Nullable State state) { + + LOG.trace("checkNodes() with state " + state); return Flux.fromIterable(hosts()) // .filter(entry -> state == null || entry.getState().equals(state)) // .map(ElasticsearchHost::getEndpoint) // - .flatMap(host -> { + .concatMap(host -> { + + LOG.trace("checking host " + host); Mono exchange = createWebClient(host) // .head().uri("/") // .headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) // - .exchange().doOnError(throwable -> { + .exchange() // + .timeout(Duration.ofSeconds(1)) // + .doOnError(throwable -> { hosts.put(host, new ElasticsearchHost(host, State.OFFLINE)); clientProvider.getErrorListener().accept(throwable); }); @@ -147,7 +177,10 @@ class MultiNodeHostProvider implements HostProvider { return Mono.just(host).zipWith(exchange .flatMap(it -> it.releaseBody().thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE))); }) // - .onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable)); + .map(tuple -> { + LOG.trace("check result " + tuple); + return tuple; + }).onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable)); } private List hosts() { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java index 253ea5bfc..060ec1e27 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,9 +32,10 @@ import org.springframework.web.reactive.function.client.WebClient; * * @author Christoph Strobl * @author Mark Paluch + * @author Peter-Josef Meisch * @since 3.2 */ -class SingleNodeHostProvider implements HostProvider { +class SingleNodeHostProvider implements HostProvider { private final WebClientProvider clientProvider; private final Supplier headersSupplier; diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java index 4b0b4e79d..b4fab9390 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,7 +62,7 @@ public class ReactiveElasticsearchClientUnitTests { static final String HOST = ":9200"; - MockDelegatingElasticsearchHostProvider hostProvider; + MockDelegatingElasticsearchHostProvider> hostProvider; ReactiveElasticsearchClient client; @BeforeEach diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java index 7fc1e5ca4..51f1666fb 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveMockClientTestsUtils.java @@ -186,7 +186,7 @@ public class ReactiveMockClientTestsUtils { return delegate; } - public MockDelegatingElasticsearchHostProvider withActiveDefaultHost(String host) { + public MockDelegatingElasticsearchHostProvider> withActiveDefaultHost(String host) { return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate, host); } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java index a27e22389..03240b223 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProviderUnitTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClient /** * @author Christoph Strobl + * @author Peter-Josef Meisch */ public class SingleNodeHostProviderUnitTests {