DATAES-767 - Fix ReactiveElasticsearch handling of 4xx HTTP responses. (#446)

Original PR: #445

(cherry picked from commit e605cad688a0e49875cf2624e7674319eb2e8c57)
This commit is contained in:
Peter-Josef Meisch 2020-05-03 04:25:36 +02:00 committed by GitHub
parent a69658dc8b
commit 0df58615e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 43 deletions

View File

@ -47,6 +47,7 @@ import java.util.function.Function;
import javax.net.ssl.SSLContext;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@ -115,6 +116,7 @@ import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientRequest;
@ -201,7 +203,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
if (!soTimeout.isNegative()) {
tcpClient = tcpClient.doOnConnected(connection -> connection //
.addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)));
@ -447,6 +448,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest)
*/
@Override
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
return sendRequest(deleteRequest, RequestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
@ -680,6 +682,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
return handleServerError(request, response);
}
if (response.statusCode().is4xxClientError()) {
ClientLogger.logRawResponse(logId, response.statusCode());
return handleClientError(logId, request, response, responseType);
}
return response.body(BodyExtractors.toMono(byte[].class)) //
.map(it -> new String(it, StandardCharsets.UTF_8)) //
.doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
@ -716,13 +724,68 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
}
private static <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
private <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
return Mono.error(
new HttpServerErrorException(response.statusCode(), String.format("%s request to %s returned error code %s.",
request.getMethod(), request.getEndpoint(), response.statusCode().value())));
}
private <T> Publisher<? extends T> handleClientError(String logId, Request request, ClientResponse response,
Class<T> responseType) {
return response.body(BodyExtractors.toMono(byte[].class)) //
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
.flatMap(content -> {
String mediaType = response.headers().contentType().map(MediaType::toString)
.orElse(XContentType.JSON.mediaType());
try {
ElasticsearchException exception = getElasticsearchException(response, content, mediaType);
if (exception != null) {
StringBuilder sb = new StringBuilder();
buildExceptionMessages(sb, exception);
return Mono.error(new HttpClientErrorException(response.statusCode(), sb.toString()));
}
} catch (Exception e) {
return Mono
.error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
}
return Mono.just(content);
})
.doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
.flatMap(content -> doDecode(response, responseType, content));
}
// region ElasticsearchException helper
@Nullable
private ElasticsearchException getElasticsearchException(ClientResponse response, String content, String mediaType)
throws IOException {
XContentParser parser = createParser(mediaType, content);
// we have a JSON object with an error and a status field
XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT
do {
token = parser.nextToken();
if (parser.currentName().equals("error")) {
return ElasticsearchException.failureFromXContent(parser);
}
} while (token == XContentParser.Token.FIELD_NAME);
return null;
}
private static void buildExceptionMessages(StringBuilder sb, Throwable t) {
sb.append(t.getMessage());
for (Throwable throwable : t.getSuppressed()) {
sb.append(", ");
buildExceptionMessages(sb, throwable);
}
}
// endregion
// region internal classes
static class RequestCreator {
static Function<SearchRequest, Request> search() {
@ -776,7 +839,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
try {
return RequestConverters.deleteByQuery(request);
} catch (IOException e) {
throw new ElasticsearchException("Could not parse request", e);
throw new org.springframework.data.elasticsearch.ElasticsearchException("Could not parse request", e);
}
};
}
@ -788,7 +851,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
try {
return RequestConverters.bulk(request);
} catch (IOException e) {
throw new ElasticsearchException("Could not parse request", e);
throw new org.springframework.data.elasticsearch.ElasticsearchException("Could not parse request", e);
}
};
}
@ -892,4 +955,5 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
}
}
// endregion
}

View File

@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*;
import lombok.SneakyThrows;
import org.springframework.web.client.HttpClientErrorException;
import reactor.test.StepVerifier;
import java.io.IOException;
@ -148,7 +149,7 @@ public class ReactiveElasticsearchClientTests {
client.get(new GetRequest(INDEX_I, TYPE_I, "nonono")) //
.as(StepVerifier::create) //
.expectError(ElasticsearchStatusException.class) //
.expectError(HttpClientErrorException.class) //
.verify();
}
@ -320,7 +321,7 @@ public class ReactiveElasticsearchClientTests {
client.index(request) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-488
@ -369,7 +370,7 @@ public class ReactiveElasticsearchClientTests {
client.update(request) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-488
@ -536,7 +537,7 @@ public class ReactiveElasticsearchClientTests {
client.indices().createIndex(request -> request.index(INDEX_I)) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-569
@ -551,12 +552,12 @@ public class ReactiveElasticsearchClientTests {
assertThat(syncClient.indices().exists(new GetIndexRequest().indices(INDEX_I), RequestOptions.DEFAULT)).isFalse();
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-767
public void deleteNonExistingIndexErrors() {
client.indices().deleteIndex(request -> request.indices(INDEX_I)) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-569
@ -569,12 +570,12 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-767
public void openNonExistingIndex() {
client.indices().openIndex(request -> request.indices(INDEX_I)) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-569
@ -587,12 +588,12 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-767
public void closeNonExistingIndex() {
client.indices().closeIndex(request -> request.indices(INDEX_I)) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-569
@ -605,12 +606,12 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-767
public void refreshNonExistingIndex() {
client.indices().refreshIndex(request -> request.indices(INDEX_I)) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-569
@ -626,7 +627,7 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-767
public void updateMappingNonExistingIndex() {
Map<String, Object> jsonMap = Collections.singletonMap("properties",
@ -634,7 +635,7 @@ public class ReactiveElasticsearchClientTests {
client.indices().updateMapping(request -> request.indices(INDEX_I).type(TYPE_I).source(jsonMap)) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-569
@ -647,12 +648,12 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-767
public void flushNonExistingIndex() {
client.indices().flushIndex(request -> request.indices(INDEX_I)) //
.as(StepVerifier::create) //
.verifyError(ElasticsearchStatusException.class);
.verifyError(HttpClientErrorException.class);
}
@Test // DATAES-684

View File

@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockWebClientProvider.Receive.*;
import org.springframework.web.client.HttpClientErrorException;
import org.elasticsearch.rest.RestStatus;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -456,7 +457,7 @@ public class ReactiveElasticsearchClientUnitTests {
.verifyComplete();
}
@Test // DATAES-488
@Test // DATAES-488, DATAES-767
public void updateShouldEmitErrorWhenNotFound() {
hostProvider.when(HOST) //
@ -464,7 +465,7 @@ public class ReactiveElasticsearchClientUnitTests {
client.update(new UpdateRequest("twitter", "doc", "1").doc(Collections.singletonMap("user", "cstrobl")))
.as(StepVerifier::create) //
.expectError(ElasticsearchStatusException.class) //
.expectError(HttpClientErrorException.class) //
.verify();
}

View File

@ -24,6 +24,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.springframework.web.client.HttpClientErrorException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -203,12 +204,12 @@ public class ReactiveElasticsearchTemplateTests {
template.save(null);
}
@Test // DATAES-519
public void findByIdShouldCompleteWhenIndexDoesNotExist() {
@Test // DATAES-519, DATAES-767
public void getByIdShouldCompleteWhenIndexDoesNotExist() {
template.findById("foo", SampleEntity.class, "no-such-index") //
.as(StepVerifier::create) //
.verifyComplete();
.expectError(HttpClientErrorException.class);
}
@Test // DATAES-504
@ -312,12 +313,12 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
public void findShouldCompleteWhenIndexDoesNotExist() {
@Test // DATAES-519, DATAES-767
public void searchShouldCompleteWhenIndexDoesNotExist() {
template.find(new CriteriaQuery(Criteria.where("message").is("some message")), SampleEntity.class, "no-such-index") //
.as(StepVerifier::create) //
.verifyComplete();
.expectError(HttpClientErrorException.class);
}
@Test // DATAES-504
@ -410,7 +411,7 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-595
@Test // DATAES-595, DATAES-767
public void shouldThrowElasticsearchStatusExceptionWhenInvalidPreferenceForGivenCriteria() {
SampleEntity sampleEntity1 = randomEntity("test message");
@ -425,7 +426,7 @@ public class ReactiveElasticsearchTemplateTests {
template.find(queryWithInvalidPreference, SampleEntity.class) //
.as(StepVerifier::create) //
.expectError(ElasticsearchStatusException.class).verify();
.expectError(HttpClientErrorException.class).verify();
}
@Test // DATAES-504
@ -480,13 +481,12 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
@Test // DATAES-519, DATAES-767
public void countShouldReturnZeroWhenIndexDoesNotExist() {
template.count(SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();
.expectError(HttpClientErrorException.class);
}
@Test // DATAES-504
@ -513,12 +513,12 @@ public class ReactiveElasticsearchTemplateTests {
.verifyComplete();
}
@Test // DATAES-519
public void deleteByIdShouldCompleteWhenIndexDoesNotExist() {
@Test // DATAES-519, DATAES-767
public void deleteShouldErrorWhenIndexDoesNotExist() {
template.deleteById("does-not-exists", SampleEntity.class, "no-such-index") //
.as(StepVerifier::create)//
.verifyComplete();
.expectError(HttpClientErrorException.class);
}
@Test // DATAES-504

View File

@ -66,10 +66,11 @@ import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpClientErrorException;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
* @author Peter-Josef Meisch
*/
@RunWith(SpringRunner.class)
@ContextConfiguration
@ -95,6 +96,11 @@ public class SimpleReactiveElasticsearchRepositoryTests {
TestUtils.deleteIndex(INDEX);
}
@After
public void after() {
TestUtils.deleteIndex(INDEX);
}
@Test // DATAES-519
public void saveShouldSaveSingleEntity() {
@ -125,9 +131,11 @@ public class SimpleReactiveElasticsearchRepositoryTests {
.verifyComplete();
}
@Test // DATAES-519
public void findByIdShouldCompleteIfIndexDoesNotExist() {
repository.findById("id-two").as(StepVerifier::create).verifyComplete();
@Test // DATAES-519, DATAES-767
public void findByIdShouldErrorIfIndexDoesNotExist() {
repository.findById("id-two") //
.as(StepVerifier::create) //
.expectError(HttpClientErrorException.class);
}
@Test // DATAES-519
@ -287,9 +295,12 @@ public class SimpleReactiveElasticsearchRepositoryTests {
repository.deleteById("does-not-exist").as(StepVerifier::create).verifyComplete();
}
@Test // DATAES-519
public void deleteByIdShouldCompleteWhenIndexDoesNotExist() {
repository.deleteById("does-not-exist").as(StepVerifier::create).verifyComplete();
@Test // DATAES-519, DATAES-767
public void deleteByIdShouldErrorWhenIndexDoesNotExist() {
repository.deleteById("does-not-exist") //
.as(StepVerifier::create) //
.verifyError(HttpClientErrorException.class);
;
}
@Test // DATAES-519