DATAES-974 - Remove usage of deprecated WebClient exchange() method.

Original PR: #552
This commit is contained in:
Peter-Josef Meisch 2020-11-11 20:12:05 +01:00 committed by GitHub
parent ce82ae07b9
commit 88a8b84e5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 120 deletions

View File

@ -145,7 +145,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe
*/ */
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices { public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
private final HostProvider hostProvider; private final HostProvider<?> hostProvider;
private final RequestCreator requestCreator; private final RequestCreator requestCreator;
private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY; private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
@ -155,7 +155,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
* *
* @param hostProvider must not be {@literal null}. * @param hostProvider must not be {@literal null}.
*/ */
public DefaultReactiveElasticsearchClient(HostProvider hostProvider) { public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider) {
this(hostProvider, new DefaultRequestCreator()); this(hostProvider, new DefaultRequestCreator());
} }
@ -166,7 +166,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
* @param hostProvider must not be {@literal null}. * @param hostProvider must not be {@literal null}.
* @param requestCreator must not be {@literal null}. * @param requestCreator must not be {@literal null}.
*/ */
public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) { public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider, RequestCreator requestCreator) {
Assert.notNull(hostProvider, "HostProvider must not be null"); Assert.notNull(hostProvider, "HostProvider must not be null");
Assert.notNull(requestCreator, "RequestCreator must not be null"); Assert.notNull(requestCreator, "RequestCreator must not be null");
@ -224,7 +224,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
WebClientProvider provider = getWebClientProvider(clientConfiguration); WebClientProvider provider = getWebClientProvider(clientConfiguration);
HostProvider hostProvider = HostProvider.provider(provider, clientConfiguration.getHeadersSupplier(), HostProvider<?> hostProvider = HostProvider.provider(provider, clientConfiguration.getHeadersSupplier(),
clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0])); clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0]));
DefaultReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator); DefaultReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator);

View File

@ -34,19 +34,20 @@ import org.springframework.web.reactive.function.client.WebClient;
* *
* @author Christoph Strobl * @author Christoph Strobl
* @author Mark Paluch * @author Mark Paluch
* @author Peter-Josef Meisch
* @since 3.2 * @since 3.2
*/ */
public interface HostProvider { public interface HostProvider<T extends HostProvider<T>> {
/** /**
* Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts. * Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts.
* *
* @param clientProvider must not be {@literal null} . * @param clientProvider must not be {@literal null} .
* @param headersSupplier to supply custom headers, must not be {@literal null} * @param headersSupplier to supply custom headers, must not be {@literal null}
* @param endpoints must not be {@literal null} nor empty. * @param endpoints must not be {@literal null} nor empty.
* @return new instance of {@link HostProvider}. * @return new instance of {@link HostProvider}.
*/ */
static HostProvider provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier, static HostProvider<?> provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
InetSocketAddress... endpoints) { InetSocketAddress... endpoints) {
Assert.notNull(clientProvider, "WebClientProvider must not be null"); Assert.notNull(clientProvider, "WebClientProvider must not be null");
@ -55,7 +56,7 @@ public interface HostProvider {
if (endpoints.length == 1) { if (endpoints.length == 1) {
return new SingleNodeHostProvider(clientProvider, headersSupplier, endpoints[0]); return new SingleNodeHostProvider(clientProvider, headersSupplier, endpoints[0]);
} else { } else {
return new MultiNodeHostProvider(clientProvider,headersSupplier, endpoints); return new MultiNodeHostProvider(clientProvider, headersSupplier, endpoints);
} }
} }

View File

@ -42,15 +42,17 @@ import org.springframework.web.reactive.function.client.WebClient;
* *
* @author Christoph Strobl * @author Christoph Strobl
* @author Mark Paluch * @author Mark Paluch
* @author Peter-Josef Meisch
* @since 3.2 * @since 3.2
*/ */
class MultiNodeHostProvider implements HostProvider { class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
private final WebClientProvider clientProvider; private final WebClientProvider clientProvider;
private final Supplier<HttpHeaders> headersSupplier; private final Supplier<HttpHeaders> headersSupplier;
private final Map<InetSocketAddress, ElasticsearchHost> hosts; private final Map<InetSocketAddress, ElasticsearchHost> hosts;
MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier, InetSocketAddress... endpoints) { MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
InetSocketAddress... endpoints) {
this.clientProvider = clientProvider; this.clientProvider = clientProvider;
this.headersSupplier = headersSupplier; this.headersSupplier = headersSupplier;
@ -136,16 +138,19 @@ class MultiNodeHostProvider implements HostProvider {
.map(ElasticsearchHost::getEndpoint) // .map(ElasticsearchHost::getEndpoint) //
.flatMap(host -> { .flatMap(host -> {
Mono<ClientResponse> exchange = createWebClient(host) // Mono<ClientResponse> clientResponseMono = createWebClient(host) //
.head().uri("/") // .head().uri("/") //
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) // .headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
.exchange().doOnError(throwable -> { .exchangeToMono(Mono::just) //
.doOnError(throwable -> {
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE)); hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
clientProvider.getErrorListener().accept(throwable); clientProvider.getErrorListener().accept(throwable);
}); });
return Mono.just(host).zipWith(exchange return Mono.just(host) //
.flatMap(it -> it.releaseBody().thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE))); .zipWith( //
clientResponseMono.flatMap(it -> it.releaseBody() //
.thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE)));
}) // }) //
.onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable)); .onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
} }

View File

@ -32,9 +32,10 @@ import org.springframework.web.reactive.function.client.WebClient;
* *
* @author Christoph Strobl * @author Christoph Strobl
* @author Mark Paluch * @author Mark Paluch
* @author Peter-Josef Meisch
* @since 3.2 * @since 3.2
*/ */
class SingleNodeHostProvider implements HostProvider { class SingleNodeHostProvider implements HostProvider<SingleNodeHostProvider> {
private final WebClientProvider clientProvider; private final WebClientProvider clientProvider;
private final Supplier<HttpHeaders> headersSupplier; private final Supplier<HttpHeaders> headersSupplier;
@ -60,20 +61,18 @@ class SingleNodeHostProvider implements HostProvider {
return createWebClient(endpoint) // return createWebClient(endpoint) //
.head().uri("/") // .head().uri("/") //
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) // .headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
.exchange() // .exchangeToMono(it -> {
.flatMap(it -> {
if (it.statusCode().isError()) { if (it.statusCode().isError()) {
state = ElasticsearchHost.offline(endpoint); state = ElasticsearchHost.offline(endpoint);
} else { } else {
state = ElasticsearchHost.online(endpoint); state = ElasticsearchHost.online(endpoint);
} }
return it.releaseBody().thenReturn(state); return Mono.just(state);
}).onErrorResume(throwable -> { }).onErrorResume(throwable -> {
state = ElasticsearchHost.offline(endpoint); state = ElasticsearchHost.offline(endpoint);
clientProvider.getErrorListener().accept(throwable); clientProvider.getErrorListener().accept(throwable);
return Mono.just(state); return Mono.just(state);
}) // }).map(elasticsearchHost -> new ClusterInformation(Collections.singleton(elasticsearchHost)));
.map(it -> new ClusterInformation(Collections.singleton(it)));
} }
/* /*

View File

@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import org.mockito.invocation.InvocationOnMock;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -30,8 +31,11 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClient
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive; import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ClientResponse;
import java.util.function.Function;
/** /**
* @author Christoph Strobl * @author Christoph Strobl
* @author Peter-Josef Meisch
*/ */
public class MultiNodeHostProviderUnitTests { public class MultiNodeHostProviderUnitTests {
@ -39,82 +43,85 @@ public class MultiNodeHostProviderUnitTests {
static final String HOST_2 = ":9201"; static final String HOST_2 = ":9201";
static final String HOST_3 = ":9202"; static final String HOST_3 = ":9202";
MockDelegatingElasticsearchHostProvider<MultiNodeHostProvider> mock; MockDelegatingElasticsearchHostProvider<MultiNodeHostProvider> multiNodeDelegatingHostProvider;
MultiNodeHostProvider provider; MultiNodeHostProvider delegateHostProvider;
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
mock = ReactiveMockClientTestsUtils.multi(HOST_1, HOST_2, HOST_3); multiNodeDelegatingHostProvider = ReactiveMockClientTestsUtils.multi(HOST_1, HOST_2, HOST_3);
provider = mock.getDelegate(); delegateHostProvider = multiNodeDelegatingHostProvider.getDelegate();
} }
@Test // DATAES-488 @Test // DATAES-488
public void refreshHostStateShouldUpdateNodeStateCorrectly() { public void refreshHostStateShouldUpdateNodeStateCorrectly() {
mock.when(HOST_1).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok); multiNodeDelegatingHostProvider.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::ok); multiNodeDelegatingHostProvider.when(HOST_3).receive(Receive::ok);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete(); delegateHostProvider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
assertThat(provider.getCachedHostState()).extracting(ElasticsearchHost::getState).containsExactly(State.OFFLINE, assertThat(delegateHostProvider.getCachedHostState()).extracting(ElasticsearchHost::getState)
State.ONLINE, State.ONLINE); .containsExactly(State.OFFLINE, State.ONLINE, State.ONLINE);
} }
@Test // DATAES-488 @Test // DATAES-488
public void getActiveReturnsFirstActiveHost() { public void getActiveReturnsFirstActiveHost() {
mock.when(HOST_1).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok); multiNodeDelegatingHostProvider.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_3).receive(Receive::error);
provider.getActive().as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete(); delegateHostProvider.getActive().as(StepVerifier::create).expectNext(multiNodeDelegatingHostProvider.client(HOST_2))
.verifyComplete();
} }
@Test // DATAES-488 @Test // DATAES-488
public void getActiveErrorsWhenNoActiveHostFound() { public void getActiveErrorsWhenNoActiveHostFound() {
mock.when(HOST_1).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_2).receive(Receive::error);
mock.when(HOST_3).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_3).receive(Receive::error);
provider.getActive().as(StepVerifier::create).expectError(IllegalStateException.class); delegateHostProvider.getActive().as(StepVerifier::create).expectError(IllegalStateException.class);
} }
@Test // DATAES-488 @Test // DATAES-488
public void lazyModeDoesNotResolveHostsTwice() { public void lazyModeDoesNotResolveHostsTwice() {
mock.when(HOST_1).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok); multiNodeDelegatingHostProvider.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_3).receive(Receive::error);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete(); delegateHostProvider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
provider.getActive(Verification.LAZY).as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete(); delegateHostProvider.getActive(Verification.LAZY).as(StepVerifier::create)
.expectNext(multiNodeDelegatingHostProvider.client(HOST_2)).verifyComplete();
verify(mock.client(":9201")).head(); verify(multiNodeDelegatingHostProvider.client(":9201")).head();
} }
@Test // DATAES-488 @Test // DATAES-488
public void alwaysModeDoesNotResolveHostsTwice() { public void alwaysModeDoesNotResolveHostsTwice() {
mock.when(HOST_1).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok); multiNodeDelegatingHostProvider.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_3).receive(Receive::error);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete(); delegateHostProvider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
provider.getActive(Verification.ACTIVE).as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete(); delegateHostProvider.getActive(Verification.ACTIVE).as(StepVerifier::create)
.expectNext(multiNodeDelegatingHostProvider.client(HOST_2)).verifyComplete();
verify(mock.client(HOST_2), times(2)).head(); verify(multiNodeDelegatingHostProvider.client(HOST_2), times(2)).head();
} }
@Test // DATAES-488 @Test // DATAES-488
public void triesDeadHostsIfNoActiveFound() { public void triesDeadHostsIfNoActiveFound() {
mock.when(HOST_1).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).get(requestHeadersUriSpec -> { multiNodeDelegatingHostProvider.when(HOST_2).get(requestHeadersUriSpec -> {
ClientResponse response1 = mock(ClientResponse.class); ClientResponse response1 = mock(ClientResponse.class);
when(response1.releaseBody()).thenReturn(Mono.empty()); when(response1.releaseBody()).thenReturn(Mono.empty());
@ -124,17 +131,29 @@ public class MultiNodeHostProviderUnitTests {
when(response2.releaseBody()).thenReturn(Mono.empty()); when(response2.releaseBody()).thenReturn(Mono.empty());
Receive.ok(response2); Receive.ok(response2);
when(requestHeadersUriSpec.exchange()).thenReturn(Mono.just(response1), Mono.just(response2)); when(requestHeadersUriSpec.exchangeToMono(any()))//
.thenAnswer(invocation -> getAnswer(invocation, response1)) //
.thenAnswer(invocation -> getAnswer(invocation, response2));
}); });
mock.when(HOST_3).receive(Receive::error); multiNodeDelegatingHostProvider.when(HOST_3).receive(Receive::error);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete(); delegateHostProvider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
assertThat(provider.getCachedHostState()).extracting(ElasticsearchHost::getState).containsExactly(State.OFFLINE, assertThat(delegateHostProvider.getCachedHostState()).extracting(ElasticsearchHost::getState)
State.OFFLINE, State.OFFLINE); .containsExactly(State.OFFLINE, State.OFFLINE, State.OFFLINE);
provider.getActive().as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete(); delegateHostProvider.getActive().as(StepVerifier::create).expectNext(multiNodeDelegatingHostProvider.client(HOST_2))
.verifyComplete();
verify(mock.client(HOST_2), times(2)).head(); verify(multiNodeDelegatingHostProvider.client(HOST_2), times(2)).head();
}
private Mono<?> getAnswer(InvocationOnMock invocation, ClientResponse response) {
final Function<ClientResponse, ? extends Mono<?>> responseHandler = invocation.getArgument(0);
if (responseHandler != null) {
return responseHandler.apply(response);
}
return Mono.empty();
} }
} }

View File

@ -62,7 +62,7 @@ public class ReactiveElasticsearchClientUnitTests {
static final String HOST = ":9200"; static final String HOST = ":9200";
MockDelegatingElasticsearchHostProvider<HostProvider> hostProvider; MockDelegatingElasticsearchHostProvider<? extends HostProvider<?>> hostProvider;
ReactiveElasticsearchClient client; ReactiveElasticsearchClient client;
@BeforeEach @BeforeEach

View File

@ -75,24 +75,23 @@ public class ReactiveMockClientTestsUtils {
return provider(hosts); return provider(hosts);
} }
public static <T extends HostProvider> MockDelegatingElasticsearchHostProvider<T> provider(String... hosts) { public static <T extends HostProvider<T>> MockDelegatingElasticsearchHostProvider<T> provider(String... hosts) {
ErrorCollector errorCollector = new ErrorCollector(); ErrorCollector errorCollector = new ErrorCollector();
MockWebClientProvider clientProvider = new MockWebClientProvider(errorCollector); MockWebClientProvider clientProvider = new MockWebClientProvider(errorCollector);
HostProvider delegate = null; T delegate;
if (hosts.length == 1) { if (hosts.length == 1) {
// noinspection unchecked
delegate = new SingleNodeHostProvider(clientProvider, HttpHeaders::new, getInetSocketAddress(hosts[0])) {}; delegate = (T) new SingleNodeHostProvider(clientProvider, HttpHeaders::new, getInetSocketAddress(hosts[0])) {};
} else { } else {
// noinspection unchecked
delegate = new MultiNodeHostProvider(clientProvider,HttpHeaders::new, Arrays.stream(hosts) delegate = (T) new MultiNodeHostProvider(clientProvider, HttpHeaders::new, Arrays.stream(hosts)
.map(ReactiveMockClientTestsUtils::getInetSocketAddress).toArray(InetSocketAddress[]::new)) {}; .map(ReactiveMockClientTestsUtils::getInetSocketAddress).toArray(InetSocketAddress[]::new)) {};
} }
return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate, return new MockDelegatingElasticsearchHostProvider<>(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate,
null); null);
} }
private static InetSocketAddress getInetSocketAddress(String hostAndPort) { private static InetSocketAddress getInetSocketAddress(String hostAndPort) {
@ -113,16 +112,18 @@ public class ReactiveMockClientTestsUtils {
} }
} }
public static class MockDelegatingElasticsearchHostProvider<T extends HostProvider> implements HostProvider { public static class MockDelegatingElasticsearchHostProvider<T extends HostProvider<T>> implements HostProvider<T> {
private final HttpHeaders httpHeaders;
private final T delegate; private final T delegate;
private final MockWebClientProvider clientProvider; private final MockWebClientProvider clientProvider;
private final ErrorCollector errorCollector; private final ErrorCollector errorCollector;
private @Nullable String activeDefaultHost; private @Nullable final String activeDefaultHost;
public MockDelegatingElasticsearchHostProvider(HttpHeaders httpHeaders, MockWebClientProvider clientProvider, public MockDelegatingElasticsearchHostProvider(HttpHeaders httpHeaders, MockWebClientProvider clientProvider,
ErrorCollector errorCollector, T delegate, String activeDefaultHost) { ErrorCollector errorCollector, T delegate, @Nullable String activeDefaultHost) {
this.httpHeaders = httpHeaders;
this.errorCollector = errorCollector; this.errorCollector = errorCollector;
this.clientProvider = clientProvider; this.clientProvider = clientProvider;
this.delegate = delegate; this.delegate = delegate;
@ -187,25 +188,23 @@ public class ReactiveMockClientTestsUtils {
} }
public MockDelegatingElasticsearchHostProvider<T> withActiveDefaultHost(String host) { public MockDelegatingElasticsearchHostProvider<T> withActiveDefaultHost(String host) {
return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate, return new MockDelegatingElasticsearchHostProvider<>(httpHeaders, clientProvider, errorCollector, delegate, host);
host);
} }
} }
public static class MockWebClientProvider implements WebClientProvider { public static class MockWebClientProvider implements WebClientProvider {
private final Object lock = new Object();
private final Consumer<Throwable> errorListener; private final Consumer<Throwable> errorListener;
private Map<InetSocketAddress, WebClient> clientMap; private final Map<InetSocketAddress, WebClient> clientMap;
private Map<InetSocketAddress, RequestHeadersUriSpec> headersUriSpecMap; private final Map<InetSocketAddress, RequestHeadersUriSpec> headersUriSpecMap;
private Map<InetSocketAddress, RequestBodyUriSpec> bodyUriSpecMap; private final Map<InetSocketAddress, RequestBodyUriSpec> bodyUriSpecMap;
private Map<InetSocketAddress, ClientResponse> responseMap; private final Map<InetSocketAddress, ClientResponse> responseMap;
public MockWebClientProvider(Consumer<Throwable> errorListener) { public MockWebClientProvider(Consumer<Throwable> errorListener) {
this.errorListener = errorListener; this.errorListener = errorListener;
this.clientMap = new LinkedHashMap<>(); this.clientMap = new ConcurrentHashMap<>();
this.headersUriSpecMap = new LinkedHashMap<>(); this.headersUriSpecMap = new LinkedHashMap<>();
this.bodyUriSpecMap = new LinkedHashMap<>(); this.bodyUriSpecMap = new LinkedHashMap<>();
this.responseMap = new LinkedHashMap<>(); this.responseMap = new LinkedHashMap<>();
@ -218,40 +217,49 @@ public class ReactiveMockClientTestsUtils {
@Override @Override
public WebClient get(InetSocketAddress endpoint) { public WebClient get(InetSocketAddress endpoint) {
synchronized (lock) { return clientMap.computeIfAbsent(endpoint, key -> {
return clientMap.computeIfAbsent(endpoint, key -> { WebClient webClient = mock(WebClient.class);
WebClient webClient = mock(WebClient.class); RequestHeadersUriSpec headersUriSpec = mock(RequestHeadersUriSpec.class);
Mockito.when(headersUriSpec.uri(any(String.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.uri(any(), any(Map.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.headers(any(Consumer.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.attribute(anyString(), anyString())).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.uri(any(Function.class))).thenReturn(headersUriSpec);
headersUriSpecMap.putIfAbsent(key, headersUriSpec);
RequestHeadersUriSpec headersUriSpec = mock(RequestHeadersUriSpec.class); ClientResponse response = mock(ClientResponse.class);
Mockito.when(webClient.get()).thenReturn(headersUriSpec); Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED);
Mockito.when(webClient.head()).thenReturn(headersUriSpec); Mockito.when(response.releaseBody()).thenReturn(Mono.empty());
Mockito.when(headersUriSpec.exchangeToMono(any())).thenAnswer(invocation -> {
final Function<ClientResponse, ? extends Mono<?>> responseHandler = invocation.getArgument(0);
Mockito.when(headersUriSpec.uri(any(String.class))).thenReturn(headersUriSpec); if (responseHandler != null) {
Mockito.when(headersUriSpec.uri(any(), any(Map.class))).thenReturn(headersUriSpec); return responseHandler.apply(response);
Mockito.when(headersUriSpec.headers(any(Consumer.class))).thenReturn(headersUriSpec); }
Mockito.when(headersUriSpec.attribute(anyString(), anyString())).thenReturn(headersUriSpec); return Mono.empty();
Mockito.when(headersUriSpec.uri(any(Function.class))).thenReturn(headersUriSpec);
RequestBodyUriSpec bodySpy = spy(WebClient.create().method(HttpMethod.POST));
Mockito.when(webClient.method(any())).thenReturn(bodySpy);
Mockito.when(bodySpy.body(any())).thenReturn(headersUriSpec);
ClientResponse response = mock(ClientResponse.class);
Mockito.when(headersUriSpec.exchange()).thenReturn(Mono.just(response));
Mockito.when(bodySpy.exchange()).thenReturn(Mono.just(response));
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED);
Mockito.when(response.releaseBody()).thenReturn(Mono.empty());
headersUriSpecMap.putIfAbsent(key, headersUriSpec);
bodyUriSpecMap.putIfAbsent(key, bodySpy);
responseMap.putIfAbsent(key, response);
return webClient;
}); });
} responseMap.putIfAbsent(key, response);
RequestBodyUriSpec bodySpy = spy(WebClient.create().method(HttpMethod.POST));
Mockito.when(bodySpy.body(any())).thenReturn(headersUriSpec);
Mockito.when(bodySpy.exchangeToMono(any())).thenAnswer(invocation -> {
final Function<ClientResponse, ? extends Mono<?>> responseHandler = invocation.getArgument(0);
if (responseHandler != null) {
return responseHandler.apply(response);
}
return Mono.empty();
});
bodyUriSpecMap.putIfAbsent(key, bodySpy);
Mockito.when(webClient.get()).thenReturn(headersUriSpec);
Mockito.when(webClient.head()).thenReturn(headersUriSpec);
Mockito.when(webClient.method(any())).thenReturn(bodySpy);
return webClient;
});
} }
@Override @Override
@ -299,18 +307,20 @@ public class ReactiveMockClientTestsUtils {
WebClient client(); WebClient client();
} }
@SuppressWarnings("UnusedReturnValue")
public interface Send extends Receive, Client { public interface Send extends Receive, Client {
Receive get(Consumer<RequestHeadersUriSpec> headerSpec); Receive get(Consumer<RequestHeadersUriSpec<?>> headerSpec);
Receive exchange(Consumer<RequestBodyUriSpec> bodySpec); Receive exchange(Consumer<RequestBodyUriSpec> bodySpec);
default URI captureUri() { default URI captureUri() {
Set<URI> capturingSet = new LinkedHashSet(); Set<URI> capturingSet = new LinkedHashSet<>();
exchange(requestBodyUriSpec -> { exchange(requestBodyUriSpec -> {
// noinspection unchecked
ArgumentCaptor<Function<UriBuilder, URI>> fkt = ArgumentCaptor.forClass(Function.class); ArgumentCaptor<Function<UriBuilder, URI>> fkt = ArgumentCaptor.forClass(Function.class);
verify(requestBodyUriSpec).uri(fkt.capture()); verify(requestBodyUriSpec).uri(fkt.capture());
@ -354,9 +364,8 @@ public class ReactiveMockClientTestsUtils {
default Receive receiveGetByIdNotFound() { default Receive receiveGetByIdNotFound() {
return receiveJsonFromFile("get-by-id-no-hit") // return receiveJsonFromFile("get-by-id-no-hit") //
.receive(response -> { .receive(
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED, HttpStatus.NOT_FOUND); response -> Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED, HttpStatus.NOT_FOUND));
});
} }
default Receive receiveGetById() { default Receive receiveGetById() {
@ -380,9 +389,8 @@ public class ReactiveMockClientTestsUtils {
default Receive updateFail() { default Receive updateFail() {
return receiveJsonFromFile("update-error-not-found") // return receiveJsonFromFile("update-error-not-found") //
.receive(response -> { .receive(
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED, HttpStatus.NOT_FOUND); response -> Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED, HttpStatus.NOT_FOUND));
});
} }
default Receive receiveBulkOk() { default Receive receiveBulkOk() {
@ -445,14 +453,14 @@ public class ReactiveMockClientTestsUtils {
} }
} }
class CallbackImpl implements Send, Receive { static class CallbackImpl implements Send, Receive {
WebClient client; WebClient client;
RequestHeadersUriSpec headersUriSpec; RequestHeadersUriSpec<?> headersUriSpec;
RequestBodyUriSpec bodyUriSpec; RequestBodyUriSpec bodyUriSpec;
ClientResponse responseDelegate; ClientResponse responseDelegate;
public CallbackImpl(WebClient client, RequestHeadersUriSpec headersUriSpec, RequestBodyUriSpec bodyUriSpec, public CallbackImpl(WebClient client, RequestHeadersUriSpec<?> headersUriSpec, RequestBodyUriSpec bodyUriSpec,
ClientResponse responseDelegate) { ClientResponse responseDelegate) {
this.client = client; this.client = client;
@ -462,7 +470,7 @@ public class ReactiveMockClientTestsUtils {
} }
@Override @Override
public Receive get(Consumer<RequestHeadersUriSpec> uriSpec) { public Receive get(Consumer<RequestHeadersUriSpec<?>> uriSpec) {
uriSpec.accept(headersUriSpec); uriSpec.accept(headersUriSpec);
return this; return this;

View File

@ -30,6 +30,7 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClient
/** /**
* @author Christoph Strobl * @author Christoph Strobl
* @author Peter-Josef Meisch
*/ */
public class SingleNodeHostProviderUnitTests { public class SingleNodeHostProviderUnitTests {