mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-05-31 09:12:11 +00:00
DATAES-971 - Fix tests for using a proxy with reactive client
Original PR: #551
This commit is contained in:
parent
6db1aa6871
commit
aa763efe7a
6
pom.xml
6
pom.xml
@ -267,6 +267,12 @@
|
|||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.specto</groupId>
|
||||||
|
<artifactId>hoverfly-java-junit5</artifactId>
|
||||||
|
<version>0.13.1</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Upgrade xbean to 4.5 to prevent incompatibilities due to ASM versions -->
|
<!-- Upgrade xbean to 4.5 to prevent incompatibilities due to ASM versions -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.xbean</groupId>
|
<groupId>org.apache.xbean</groupId>
|
||||||
|
@ -45,7 +45,7 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
private final @Nullable ClientHttpConnector connector;
|
private final @Nullable ClientHttpConnector connector;
|
||||||
private final Consumer<Throwable> errorListener;
|
private final Consumer<Throwable> errorListener;
|
||||||
private final HttpHeaders headers;
|
private final HttpHeaders headers;
|
||||||
private final String pathPrefix;
|
private final @Nullable String pathPrefix;
|
||||||
private final Function<WebClient, WebClient> webClientConfigurer;
|
private final Function<WebClient, WebClient> webClientConfigurer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -61,11 +61,11 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
/**
|
/**
|
||||||
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
* Create new {@link DefaultWebClientProvider} with empty {@link HttpHeaders} and no-op {@literal error listener}.
|
||||||
*
|
*
|
||||||
* @param pathPrefix can be {@literal null}
|
|
||||||
* @param scheme must not be {@literal null}.
|
* @param scheme must not be {@literal null}.
|
||||||
* @param connector can be {@literal null}.
|
* @param connector can be {@literal null}.
|
||||||
* @param errorListener must not be {@literal null}.
|
* @param errorListener must not be {@literal null}.
|
||||||
* @param headers must not be {@literal null}.
|
* @param headers must not be {@literal null}.
|
||||||
|
* @param pathPrefix can be {@literal null}
|
||||||
* @param webClientConfigurer must not be {@literal null}.
|
* @param webClientConfigurer must not be {@literal null}.
|
||||||
*/
|
*/
|
||||||
private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector,
|
private DefaultWebClientProvider(String scheme, @Nullable ClientHttpConnector connector,
|
||||||
@ -145,7 +145,6 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
|
protected WebClient createWebClientForSocketAddress(InetSocketAddress socketAddress) {
|
||||||
|
|
||||||
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
|
Builder builder = WebClient.builder().defaultHeaders(it -> it.addAll(getDefaultHeaders()));
|
||||||
@ -156,7 +155,11 @@ class DefaultWebClientProvider implements WebClientProvider {
|
|||||||
|
|
||||||
String baseUrl = String.format("%s://%s:%d%s", this.scheme, socketAddress.getHostString(), socketAddress.getPort(),
|
String baseUrl = String.format("%s://%s:%d%s", this.scheme, socketAddress.getHostString(), socketAddress.getPort(),
|
||||||
pathPrefix == null ? "" : '/' + pathPrefix);
|
pathPrefix == null ? "" : '/' + pathPrefix);
|
||||||
WebClient webClient = builder.baseUrl(baseUrl).filter((request, next) -> next.exchange(request).doOnError(errorListener)).build();
|
WebClient webClient = builder //
|
||||||
|
.baseUrl(baseUrl) //
|
||||||
|
.filter((request, next) -> next.exchange(request) //
|
||||||
|
.doOnError(errorListener)) //
|
||||||
|
.build(); //
|
||||||
return webClientConfigurer.apply(webClient);
|
return webClientConfigurer.apply(webClient);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,8 @@ class SingleNodeHostProvider implements HostProvider {
|
|||||||
private final InetSocketAddress endpoint;
|
private final InetSocketAddress endpoint;
|
||||||
private volatile ElasticsearchHost state;
|
private volatile ElasticsearchHost state;
|
||||||
|
|
||||||
SingleNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier, InetSocketAddress endpoint) {
|
SingleNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
|
||||||
|
InetSocketAddress endpoint) {
|
||||||
|
|
||||||
this.clientProvider = clientProvider;
|
this.clientProvider = clientProvider;
|
||||||
this.headersSupplier = headersSupplier;
|
this.headersSupplier = headersSupplier;
|
||||||
@ -57,7 +58,7 @@ class SingleNodeHostProvider implements HostProvider {
|
|||||||
public Mono<ClusterInformation> clusterInfo() {
|
public Mono<ClusterInformation> clusterInfo() {
|
||||||
|
|
||||||
return createWebClient(endpoint) //
|
return createWebClient(endpoint) //
|
||||||
.head().uri("/")
|
.head().uri("/") //
|
||||||
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
|
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
|
||||||
.exchange() //
|
.exchange() //
|
||||||
.flatMap(it -> {
|
.flatMap(it -> {
|
||||||
@ -68,7 +69,6 @@ class SingleNodeHostProvider implements HostProvider {
|
|||||||
}
|
}
|
||||||
return it.releaseBody().thenReturn(state);
|
return it.releaseBody().thenReturn(state);
|
||||||
}).onErrorResume(throwable -> {
|
}).onErrorResume(throwable -> {
|
||||||
|
|
||||||
state = ElasticsearchHost.offline(endpoint);
|
state = ElasticsearchHost.offline(endpoint);
|
||||||
clientProvider.getErrorListener().accept(throwable);
|
clientProvider.getErrorListener().accept(throwable);
|
||||||
return Mono.just(state);
|
return Mono.just(state);
|
||||||
|
@ -2,8 +2,15 @@ package org.springframework.data.elasticsearch.client;
|
|||||||
|
|
||||||
import static com.github.tomakehurst.wiremock.client.WireMock.*;
|
import static com.github.tomakehurst.wiremock.client.WireMock.*;
|
||||||
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
|
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
|
||||||
|
import static io.specto.hoverfly.junit.dsl.HoverflyDsl.*;
|
||||||
|
import static io.specto.hoverfly.junit.verification.HoverflyVerifications.*;
|
||||||
import static org.assertj.core.api.Assertions.*;
|
import static org.assertj.core.api.Assertions.*;
|
||||||
|
|
||||||
|
import io.specto.hoverfly.junit.core.Hoverfly;
|
||||||
|
import io.specto.hoverfly.junit5.HoverflyExtension;
|
||||||
|
import io.specto.hoverfly.junit5.api.HoverflyCapture;
|
||||||
|
import io.specto.hoverfly.junit5.api.HoverflyConfig;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -13,8 +20,8 @@ import java.util.stream.Stream;
|
|||||||
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.RequestOptions;
|
||||||
import org.elasticsearch.client.RestClientBuilder;
|
import org.elasticsearch.client.RestClientBuilder;
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
import org.junit.jupiter.api.Disabled;
|
|
||||||
import org.junit.jupiter.api.DisplayName;
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
|
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
|
||||||
@ -27,40 +34,42 @@ import com.github.tomakehurst.wiremock.matching.AnythingPattern;
|
|||||||
import com.github.tomakehurst.wiremock.matching.EqualToPattern;
|
import com.github.tomakehurst.wiremock.matching.EqualToPattern;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* We need hoverfly for testing the reactive code to use a proxy. Wiremock cannot intercept the proxy calls as WebClient
|
||||||
|
* uses HTTP CONNECT on proxy requests which wiremock does not support.
|
||||||
|
*
|
||||||
* @author Peter-Josef Meisch
|
* @author Peter-Josef Meisch
|
||||||
*/
|
*/
|
||||||
@Disabled("SocketException: Socket closed happens on the CLI build while running the test individually succeeds")
|
@HoverflyCapture(path = "target/hoverfly", config = @HoverflyConfig(proxyLocalHost = true, plainHttpTunneling = true))
|
||||||
|
@ExtendWith(HoverflyExtension.class)
|
||||||
public class RestClientsTest {
|
public class RestClientsTest {
|
||||||
|
|
||||||
@ParameterizedTest // DATAES-700
|
@ParameterizedTest // DATAES-700
|
||||||
@MethodSource("clientUnderTestFactorySource")
|
@MethodSource("clientUnderTestFactorySource")
|
||||||
@DisplayName("should use configured proxy")
|
@DisplayName("should use configured proxy")
|
||||||
void shouldUseConfiguredProxy(ClientUnderTestFactory clientUnderTestFactory) throws IOException {
|
void shouldUseConfiguredProxy(ClientUnderTestFactory clientUnderTestFactory, Hoverfly hoverfly) throws IOException {
|
||||||
|
|
||||||
if (clientUnderTestFactory instanceof ReactiveElasticsearchClientUnderTestFactory) {
|
|
||||||
// although the reactive code is using the proxy for every call - tested with an intercepting
|
|
||||||
// proxy - somehow in this test wiremock fails to register this. So we skip it here
|
|
||||||
//
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
wireMockServer(server -> {
|
wireMockServer(server -> {
|
||||||
|
|
||||||
|
// wiremock is the dummy server, hoverfly the proxy
|
||||||
WireMock.configureFor(server.port());
|
WireMock.configureFor(server.port());
|
||||||
|
|
||||||
stubFor(head(urlEqualTo("/")).willReturn(aResponse() //
|
stubFor(head(urlEqualTo("/")).willReturn(aResponse() //
|
||||||
.withHeader("Content-Type", "application/json; charset=UTF-8")));
|
.withHeader("Content-Type", "application/json; charset=UTF-8")));
|
||||||
|
|
||||||
|
String serviceHost = "localhost:" + server.port();
|
||||||
|
String proxyHost = "localhost:" + hoverfly.getHoverflyConfig().getProxyPort();
|
||||||
|
|
||||||
ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder();
|
ClientConfigurationBuilder configurationBuilder = new ClientConfigurationBuilder();
|
||||||
ClientConfiguration clientConfiguration = configurationBuilder //
|
ClientConfiguration clientConfiguration = configurationBuilder //
|
||||||
.connectedTo("localhost:4711")//
|
.connectedTo(serviceHost)//
|
||||||
.withProxy("localhost:" + server.port()) //
|
.withProxy(proxyHost) //
|
||||||
.build();
|
.build();
|
||||||
ClientUnderTest clientUnderTest = clientUnderTestFactory.create(clientConfiguration);
|
ClientUnderTest clientUnderTest = clientUnderTestFactory.create(clientConfiguration);
|
||||||
|
|
||||||
clientUnderTest.ping();
|
boolean result = clientUnderTest.ping();
|
||||||
|
|
||||||
|
assertThat(result).isTrue();
|
||||||
verify(headRequestedFor(urlEqualTo("/")));
|
verify(headRequestedFor(urlEqualTo("/")));
|
||||||
|
hoverfly.verify(service(serviceHost).head("/"), atLeast(1));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user