diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 4f687c36e..092857e2e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -809,6 +809,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); return response.body(BodyExtractors.toMono(byte[].class)) // + .switchIfEmpty(Mono.error( + new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.", + request.getMethod(), request.getEndpoint(), statusCode), status))) .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // .flatMap(content -> contentOrError(content, mediaType, status)) .flatMap(unused -> Mono @@ -834,7 +837,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch /** * checks if the given content body contains an {@link ElasticsearchException}, if yes it is returned in a Mono.error. * Otherwise the content is returned in the Mono - * + * * @param content the content to analyze * @param mediaType the returned media type * @param status the response status @@ -855,7 +858,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch /** * tries to parse an {@link ElasticsearchException} from the given body content - * + * * @param content the content to analyse * @param mediaType the type of the body content * @return an {@link ElasticsearchException} or {@literal null}. diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java index 7e7000bd1..fad8da364 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClientTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,23 +19,31 @@ import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.search.internal.SearchContext.*; import static org.mockito.Mockito.*; -import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import java.net.URI; +import java.util.Optional; import java.util.function.Function; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Request; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.jupiter.api.BeforeEach; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.http.HttpStatus; import org.springframework.web.reactive.function.client.ClientResponse; -import reactor.test.StepVerifier; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriBuilder; /** * @author Peter-Josef Meisch @@ -46,29 +54,23 @@ class DefaultReactiveElasticsearchClientTest { @Mock private HostProvider hostProvider; @Mock private Function searchRequestConverter; + @Spy private RequestCreator requestCreator; - private DefaultReactiveElasticsearchClient client; + @Mock private WebClient webClient; - @BeforeEach - void setUp() { - client = new DefaultReactiveElasticsearchClient(hostProvider, new RequestCreator() { - @Override - public Function search() { - return searchRequestConverter; - } - }) { + @Test + void shouldSetAppropriateRequestParametersOnCount() { + + when(requestCreator.search()).thenReturn(searchRequestConverter); + SearchRequest searchRequest = new SearchRequest("someindex") // + .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); + + ReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator) { @Override public Mono execute(ReactiveElasticsearchClientCallback callback) { return Mono.empty(); } }; - } - - @Test - void shouldSetAppropriateRequestParametersOnCount() { - - SearchRequest searchRequest = new SearchRequest("someindex") // - .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); client.count(searchRequest).as(StepVerifier::create).verifyComplete(); @@ -79,4 +81,31 @@ class DefaultReactiveElasticsearchClientTest { assertThat(source.trackTotalHitsUpTo()).isEqualTo(TRACK_TOTAL_HITS_ACCURATE); assertThat(source.fetchSource()).isEqualTo(FetchSourceContext.DO_NOT_FETCH_SOURCE); } + + @Test // #1712 + @DisplayName("should throw ElasticsearchStatusException on server 5xx with empty body") + void shouldThrowElasticsearchStatusExceptionOnServer5xxWithEmptyBody() { + + when(hostProvider.getActive(any())).thenReturn(Mono.just(webClient)); + WebClient.RequestBodyUriSpec requestBodyUriSpec = mock(WebClient.RequestBodyUriSpec.class); + when(requestBodyUriSpec.uri((Function) any())).thenReturn(requestBodyUriSpec); + when(requestBodyUriSpec.attribute(any(), any())).thenReturn(requestBodyUriSpec); + when(requestBodyUriSpec.headers(any())).thenReturn(requestBodyUriSpec); + when(webClient.method(any())).thenReturn(requestBodyUriSpec); + + ClientResponse clientResponse = mock(ClientResponse.class); + when(clientResponse.statusCode()).thenReturn(HttpStatus.SERVICE_UNAVAILABLE); + ClientResponse.Headers headers = mock(ClientResponse.Headers.class); + when(headers.contentType()).thenReturn(Optional.empty()); + when(clientResponse.headers()).thenReturn(headers); + when(clientResponse.body(any())).thenReturn(Mono.empty()); + when(requestBodyUriSpec.exchange()).thenReturn(Mono.just(clientResponse)); + + ReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator); + + client.get(new GetRequest("42")) // + .as(StepVerifier::create) // + .expectError(ElasticsearchStatusException.class) // + .verify(); // + } }