From 0df58615e9ebd12c3a3b639a19837f98367e6938 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sun, 3 May 2020 04:25:36 +0200 Subject: [PATCH] DATAES-767 - Fix ReactiveElasticsearch handling of 4xx HTTP responses. (#446) Original PR: #445 (cherry picked from commit e605cad688a0e49875cf2624e7674319eb2e8c57) --- .../DefaultReactiveElasticsearchClient.java | 72 +++++++++++++++++-- .../ReactiveElasticsearchClientTests.java | 33 ++++----- .../ReactiveElasticsearchClientUnitTests.java | 5 +- .../ReactiveElasticsearchTemplateTests.java | 28 ++++---- ...eReactiveElasticsearchRepositoryTests.java | 25 +++++-- 5 files changed, 120 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 9f95c5b89..1c2492237 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -47,6 +47,7 @@ import java.util.function.Function; import javax.net.ssl.SSLContext; import org.apache.http.util.EntityUtils; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -115,6 +116,7 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; +import org.springframework.web.client.HttpClientErrorException; import org.springframework.web.client.HttpServerErrorException; import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.client.ClientRequest; @@ -201,7 +203,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } if (!soTimeout.isNegative()) { - tcpClient = tcpClient.doOnConnected(connection -> connection // .addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)) .addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))); @@ -447,6 +448,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest) */ + @Override public Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) { return sendRequest(deleteRequest, RequestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) // @@ -680,6 +682,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch return handleServerError(request, response); } + if (response.statusCode().is4xxClientError()) { + + ClientLogger.logRawResponse(logId, response.statusCode()); + return handleClientError(logId, request, response, responseType); + } + return response.body(BodyExtractors.toMono(byte[].class)) // .map(it -> new String(it, StandardCharsets.UTF_8)) // .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) // @@ -716,13 +724,68 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content); } - private static Publisher handleServerError(Request request, ClientResponse response) { + private Publisher handleServerError(Request request, ClientResponse response) { return Mono.error( new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.", request.getMethod(), request.getEndpoint(), response.statusCode().value()))); } + private Publisher handleClientError(String logId, Request request, ClientResponse response, + Class responseType) { + + return response.body(BodyExtractors.toMono(byte[].class)) // + .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // + .flatMap(content -> { + String mediaType = response.headers().contentType().map(MediaType::toString) + .orElse(XContentType.JSON.mediaType()); + try { + ElasticsearchException exception = getElasticsearchException(response, content, mediaType); + if (exception != null) { + StringBuilder sb = new StringBuilder(); + buildExceptionMessages(sb, exception); + return Mono.error(new HttpClientErrorException(response.statusCode(), sb.toString())); + } + } catch (Exception e) { + return Mono + .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); + } + return Mono.just(content); + }) + .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) // + .flatMap(content -> doDecode(response, responseType, content)); + } + + // region ElasticsearchException helper + @Nullable + private ElasticsearchException getElasticsearchException(ClientResponse response, String content, String mediaType) + throws IOException { + + XContentParser parser = createParser(mediaType, content); + // we have a JSON object with an error and a status field + XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT + + do { + token = parser.nextToken(); + + if (parser.currentName().equals("error")) { + return ElasticsearchException.failureFromXContent(parser); + } + } while (token == XContentParser.Token.FIELD_NAME); + return null; + } + + private static void buildExceptionMessages(StringBuilder sb, Throwable t) { + + sb.append(t.getMessage()); + for (Throwable throwable : t.getSuppressed()) { + sb.append(", "); + buildExceptionMessages(sb, throwable); + } + } + // endregion + + // region internal classes static class RequestCreator { static Function search() { @@ -776,7 +839,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch try { return RequestConverters.deleteByQuery(request); } catch (IOException e) { - throw new ElasticsearchException("Could not parse request", e); + throw new org.springframework.data.elasticsearch.ElasticsearchException("Could not parse request", e); } }; } @@ -788,7 +851,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch try { return RequestConverters.bulk(request); } catch (IOException e) { - throw new ElasticsearchException("Could not parse request", e); + throw new org.springframework.data.elasticsearch.ElasticsearchException("Could not parse request", e); } }; } @@ -892,4 +955,5 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } } } + // endregion } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java index 8ef7a3b65..4565fa857 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientTests.java @@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.client.reactive; import static org.assertj.core.api.Assertions.*; import lombok.SneakyThrows; +import org.springframework.web.client.HttpClientErrorException; import reactor.test.StepVerifier; import java.io.IOException; @@ -148,7 +149,7 @@ public class ReactiveElasticsearchClientTests { client.get(new GetRequest(INDEX_I, TYPE_I, "nonono")) // .as(StepVerifier::create) // - .expectError(ElasticsearchStatusException.class) // + .expectError(HttpClientErrorException.class) // .verify(); } @@ -320,7 +321,7 @@ public class ReactiveElasticsearchClientTests { client.index(request) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-488 @@ -369,7 +370,7 @@ public class ReactiveElasticsearchClientTests { client.update(request) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-488 @@ -536,7 +537,7 @@ public class ReactiveElasticsearchClientTests { client.indices().createIndex(request -> request.index(INDEX_I)) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-569 @@ -551,12 +552,12 @@ public class ReactiveElasticsearchClientTests { assertThat(syncClient.indices().exists(new GetIndexRequest().indices(INDEX_I), RequestOptions.DEFAULT)).isFalse(); } - @Test // DATAES-569 + @Test // DATAES-569, DATAES-767 public void deleteNonExistingIndexErrors() { client.indices().deleteIndex(request -> request.indices(INDEX_I)) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-569 @@ -569,12 +570,12 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } - @Test // DATAES-569 + @Test // DATAES-569, DATAES-767 public void openNonExistingIndex() { client.indices().openIndex(request -> request.indices(INDEX_I)) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-569 @@ -587,12 +588,12 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } - @Test // DATAES-569 + @Test // DATAES-569, DATAES-767 public void closeNonExistingIndex() { client.indices().closeIndex(request -> request.indices(INDEX_I)) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-569 @@ -605,12 +606,12 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } - @Test // DATAES-569 + @Test // DATAES-569, DATAES-767 public void refreshNonExistingIndex() { client.indices().refreshIndex(request -> request.indices(INDEX_I)) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-569 @@ -626,7 +627,7 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } - @Test // DATAES-569 + @Test // DATAES-569, DATAES-767 public void updateMappingNonExistingIndex() { Map jsonMap = Collections.singletonMap("properties", @@ -634,7 +635,7 @@ public class ReactiveElasticsearchClientTests { client.indices().updateMapping(request -> request.indices(INDEX_I).type(TYPE_I).source(jsonMap)) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-569 @@ -647,12 +648,12 @@ public class ReactiveElasticsearchClientTests { .verifyComplete(); } - @Test // DATAES-569 + @Test // DATAES-569, DATAES-767 public void flushNonExistingIndex() { client.indices().flushIndex(request -> request.indices(INDEX_I)) // .as(StepVerifier::create) // - .verifyError(ElasticsearchStatusException.class); + .verifyError(HttpClientErrorException.class); } @Test // DATAES-684 diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java index 4f4e0852c..7431bd3bb 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientUnitTests.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*; +import org.springframework.web.client.HttpClientErrorException; import org.elasticsearch.rest.RestStatus; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -456,7 +457,7 @@ public class ReactiveElasticsearchClientUnitTests { .verifyComplete(); } - @Test // DATAES-488 + @Test // DATAES-488, DATAES-767 public void updateShouldEmitErrorWhenNotFound() { hostProvider.when(HOST) // @@ -464,7 +465,7 @@ public class ReactiveElasticsearchClientUnitTests { client.update(new UpdateRequest("twitter", "doc", "1").doc(Collections.singletonMap("user", "cstrobl"))) .as(StepVerifier::create) // - .expectError(ElasticsearchStatusException.class) // + .expectError(HttpClientErrorException.class) // .verify(); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index 0326bddd0..01c768d66 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -24,6 +24,7 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.springframework.web.client.HttpClientErrorException; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -203,12 +204,12 @@ public class ReactiveElasticsearchTemplateTests { template.save(null); } - @Test // DATAES-519 - public void findByIdShouldCompleteWhenIndexDoesNotExist() { + @Test // DATAES-519, DATAES-767 + public void getByIdShouldCompleteWhenIndexDoesNotExist() { template.findById("foo", SampleEntity.class, "no-such-index") // .as(StepVerifier::create) // - .verifyComplete(); + .expectError(HttpClientErrorException.class); } @Test // DATAES-504 @@ -312,12 +313,12 @@ public class ReactiveElasticsearchTemplateTests { .verifyComplete(); } - @Test // DATAES-519 - public void findShouldCompleteWhenIndexDoesNotExist() { + @Test // DATAES-519, DATAES-767 + public void searchShouldCompleteWhenIndexDoesNotExist() { template.find(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class, "no-such-index") // .as(StepVerifier::create) // - .verifyComplete(); + .expectError(HttpClientErrorException.class); } @Test // DATAES-504 @@ -410,7 +411,7 @@ public class ReactiveElasticsearchTemplateTests { .verifyComplete(); } - @Test // DATAES-595 + @Test // DATAES-595, DATAES-767 public void shouldThrowElasticsearchStatusExceptionWhenInvalidPreferenceForGivenCriteria() { SampleEntity sampleEntity1 = randomEntity("test message"); @@ -425,7 +426,7 @@ public class ReactiveElasticsearchTemplateTests { template.find(queryWithInvalidPreference, SampleEntity.class) // .as(StepVerifier::create) // - .expectError(ElasticsearchStatusException.class).verify(); + .expectError(HttpClientErrorException.class).verify(); } @Test // DATAES-504 @@ -480,13 +481,12 @@ public class ReactiveElasticsearchTemplateTests { .verifyComplete(); } - @Test // DATAES-519 + @Test // DATAES-519, DATAES-767 public void countShouldReturnZeroWhenIndexDoesNotExist() { template.count(SampleEntity.class) // .as(StepVerifier::create) // - .expectNext(0L) // - .verifyComplete(); + .expectError(HttpClientErrorException.class); } @Test // DATAES-504 @@ -513,12 +513,12 @@ public class ReactiveElasticsearchTemplateTests { .verifyComplete(); } - @Test // DATAES-519 - public void deleteByIdShouldCompleteWhenIndexDoesNotExist() { + @Test // DATAES-519, DATAES-767 + public void deleteShouldErrorWhenIndexDoesNotExist() { template.deleteById("does-not-exists", SampleEntity.class, "no-such-index") // .as(StepVerifier::create)// - .verifyComplete(); + .expectError(HttpClientErrorException.class); } @Test // DATAES-504 diff --git a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java index 13a728c63..47a7e1d9d 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java @@ -66,10 +66,11 @@ import org.springframework.data.repository.reactive.ReactiveCrudRepository; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.StringUtils; +import org.springframework.web.client.HttpClientErrorException; /** * @author Christoph Strobl - * @currentRead Fool's Fate - Robin Hobb + * @author Peter-Josef Meisch */ @RunWith(SpringRunner.class) @ContextConfiguration @@ -95,6 +96,11 @@ public class SimpleReactiveElasticsearchRepositoryTests { TestUtils.deleteIndex(INDEX); } + @After + public void after() { + TestUtils.deleteIndex(INDEX); + } + @Test // DATAES-519 public void saveShouldSaveSingleEntity() { @@ -125,9 +131,11 @@ public class SimpleReactiveElasticsearchRepositoryTests { .verifyComplete(); } - @Test // DATAES-519 - public void findByIdShouldCompleteIfIndexDoesNotExist() { - repository.findById("id-two").as(StepVerifier::create).verifyComplete(); + @Test // DATAES-519, DATAES-767 + public void findByIdShouldErrorIfIndexDoesNotExist() { + repository.findById("id-two") // + .as(StepVerifier::create) // + .expectError(HttpClientErrorException.class); } @Test // DATAES-519 @@ -287,9 +295,12 @@ public class SimpleReactiveElasticsearchRepositoryTests { repository.deleteById("does-not-exist").as(StepVerifier::create).verifyComplete(); } - @Test // DATAES-519 - public void deleteByIdShouldCompleteWhenIndexDoesNotExist() { - repository.deleteById("does-not-exist").as(StepVerifier::create).verifyComplete(); + @Test // DATAES-519, DATAES-767 + public void deleteByIdShouldErrorWhenIndexDoesNotExist() { + repository.deleteById("does-not-exist") // + .as(StepVerifier::create) // + .verifyError(HttpClientErrorException.class); + ; } @Test // DATAES-519