diff --git a/pom.xml b/pom.xml
index ca4e38728..758725aa9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -267,6 +267,12 @@
+
+ io.specto
+ hoverfly-java-junit5
+ 0.13.1
+
+
org.apache.xbean
diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java
index 30be17eca..fea95fe70 100644
--- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java
+++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultWebClientProvider.java
@@ -45,7 +45,7 @@ class DefaultWebClientProvider implements WebClientProvider {
private final @Nullable ClientHttpConnector connector;
private final Consumer errorListener;
private final HttpHeaders headers;
- private final String pathPrefix;
+ private final @Nullable String pathPrefix;
private final Function webClientConfigurer;
/**
@@ -61,11 +61,11 @@ class DefaultWebClientProvider implements WebClientProvider {
/**
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
*
- * @param pathPrefix can be {@literal null}
* @param scheme must not be {@literal null}.
* @param connector can be {@literal null}.
* @param errorListener must not be {@literal null}.
* @param headers must not be {@literal null}.
+ * @param pathPrefix can be {@literal null}
* @param webClientConfigurer must not be {@literal null}.
*/
private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector,
@@ -145,7 +145,6 @@ class DefaultWebClientProvider implements WebClientProvider {
}
-
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
@@ -156,7 +155,11 @@ class DefaultWebClientProvider implements WebClientProvider {
String baseUrl = String.format("%s://%s:%d%s", this.scheme, socketAddress.getHostString(), socketAddress.getPort(),
pathPrefix == null ? "" : '/' + pathPrefix);
- WebClient webClient = builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
+ WebClient webClient = builder //
+ .baseUrl(baseUrl) //
+ .filter((request, next) -> next.exchange(request) //
+ .doOnError(errorListener)) //
+ .build(); //
return webClientConfigurer.apply(webClient);
}
}
diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java
index 253ea5bfc..24360cac7 100644
--- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java
+++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java
@@ -41,7 +41,8 @@ class SingleNodeHostProvider implements HostProvider {
private final InetSocketAddress endpoint;
private volatile ElasticsearchHost state;
- SingleNodeHostProvider(WebClientProvider clientProvider, Supplier headersSupplier, InetSocketAddress endpoint) {
+ SingleNodeHostProvider(WebClientProvider clientProvider, Supplier headersSupplier,
+ InetSocketAddress endpoint) {
this.clientProvider = clientProvider;
this.headersSupplier = headersSupplier;
@@ -57,7 +58,7 @@ class SingleNodeHostProvider implements HostProvider {
public Mono clusterInfo() {
return createWebClient(endpoint) //
- .head().uri("/")
+ .head().uri("/") //
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
.exchange() //
.flatMap(it -> {
@@ -68,7 +69,6 @@ class SingleNodeHostProvider implements HostProvider {
}
return it.releaseBody().thenReturn(state);
}).onErrorResume(throwable -> {
-
state = ElasticsearchHost.offline(endpoint);
clientProvider.getErrorListener().accept(throwable);
return Mono.just(state);
diff --git a/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java b/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java
index d3e167ead..5d1dbbde9 100644
--- a/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java
+++ b/src/test/java/org/springframework/data/elasticsearch/client/RestClientsTest.java
@@ -2,8 +2,15 @@ package org.springframework.data.elasticsearch.client;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+import static io.specto.hoverfly.junit.dsl.HoverflyDsl.*;
+import static io.specto.hoverfly.junit.verification.HoverflyVerifications.*;
import static org.assertj.core.api.Assertions.*;
+import io.specto.hoverfly.junit.core.Hoverfly;
+import io.specto.hoverfly.junit5.HoverflyExtension;
+import io.specto.hoverfly.junit5.api.HoverflyCapture;
+import io.specto.hoverfly.junit5.api.HoverflyConfig;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
@@ -13,8 +20,8 @@ import java.util.stream.Stream;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
@@ -27,40 +34,42 @@ import com.github.tomakehurst.wiremock.matching.AnythingPattern;
import com.github.tomakehurst.wiremock.matching.EqualToPattern;
/**
+ * We need hoverfly for testing the reactive code to use a proxy. Wiremock cannot intercept the proxy calls as WebClient
+ * uses HTTP CONNECT on proxy requests which wiremock does not support.
+ *
* @author Peter-Josef Meisch
*/
-@Disabled("SocketException: Socket closed happens on the CLI build while running the test individually succeeds")
+@HoverflyCapture(path = "target/hoverfly", config = @HoverflyConfig(proxyLocalHost = true, plainHttpTunneling = true))
+@ExtendWith(HoverflyExtension.class)
public class RestClientsTest {
@ParameterizedTest // DATAES-700
@MethodSource("clientUnderTestFactorySource")
@DisplayName("should use configured proxy")
- void shouldUseConfiguredProxy(ClientUnderTestFactory clientUnderTestFactory) throws IOException {
-
- if (clientUnderTestFactory instanceof ReactiveElasticsearchClientUnderTestFactory) {
- // although the reactive code is using the proxy for every call - tested with an intercepting
- // proxy - somehow in this test wiremock fails to register this. So we skip it here
- //
- return;
- }
+ void shouldUseConfiguredProxy(ClientUnderTestFactory clientUnderTestFactory, Hoverfly hoverfly) throws IOException {
wireMockServer(server -> {
+ // wiremock is the dummy server, hoverfly the proxy
WireMock.configureFor(server.port());
-
stubFor(head(urlEqualTo("/")).willReturn(aResponse() //
.withHeader("Content-Type", "application/json; charset=UTF-8")));
+ String serviceHost = "localhost:" + server.port();
+ String proxyHost = "localhost:" + hoverfly.getHoverflyConfig().getProxyPort();
+
ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder();
ClientConfiguration clientConfiguration = configurationBuilder //
- .connectedTo("localhost:4711")//
- .withProxy("localhost:" + server.port()) //
+ .connectedTo(serviceHost)//
+ .withProxy(proxyHost) //
.build();
ClientUnderTest clientUnderTest = clientUnderTestFactory.create(clientConfiguration);
- clientUnderTest.ping();
+ boolean result = clientUnderTest.ping();
+ assertThat(result).isTrue();
verify(headRequestedFor(urlEqualTo("/")));
+ hoverfly.verify(service(serviceHost).head("/"), atLeast(1));
});
}