diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index e1b87ab0e..4f687c36e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -804,53 +804,82 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch private Publisher handleServerError(Request request, ClientResponse response) { - RestStatus status = RestStatus.fromCode(response.statusCode().value()); + int statusCode = response.statusCode().value(); + RestStatus status = RestStatus.fromCode(statusCode); + String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); - return Mono.error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.", - request.getMethod(), request.getEndpoint(), response.statusCode().value()), status)); + return response.body(BodyExtractors.toMono(byte[].class)) // + .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // + .flatMap(content -> contentOrError(content, mediaType, status)) + .flatMap(unused -> Mono + .error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.", + request.getMethod(), request.getEndpoint(), statusCode), status))); } private Publisher handleClientError(String logId, Request request, ClientResponse response, Class responseType) { + int statusCode = response.statusCode().value(); + RestStatus status = RestStatus.fromCode(statusCode); + String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); + return response.body(BodyExtractors.toMono(byte[].class)) // .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // - .flatMap(content -> { - String mediaType = response.headers().contentType().map(MediaType::toString) - .orElse(XContentType.JSON.mediaType()); - RestStatus status = RestStatus.fromCode(response.statusCode().value()); - try { - ElasticsearchException exception = getElasticsearchException(response, content, mediaType); - if (exception != null) { - StringBuilder sb = new StringBuilder(); - buildExceptionMessages(sb, exception); - return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception)); - } - } catch (Exception e) { - return Mono.error(new ElasticsearchStatusException(content, status)); - } - return Mono.just(content); - }).doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) // + .flatMap(content -> contentOrError(content, mediaType, status)) // + .doOnNext(content -> ClientLogger.logResponse(logId, response.statusCode(), content)) // .flatMap(content -> doDecode(response, responseType, content)); } // region ElasticsearchException helper + /** + * checks if the given content body contains an {@link ElasticsearchException}, if yes it is returned in a Mono.error. + * Otherwise the content is returned in the Mono + * + * @param content the content to analyze + * @param mediaType the returned media type + * @param status the response status + * @return a Mono with the content or an Mono.error + */ + private static Mono contentOrError(String content, String mediaType, RestStatus status) { + + ElasticsearchException exception = getElasticsearchException(content, mediaType, status); + + if (exception != null) { + StringBuilder sb = new StringBuilder(); + buildExceptionMessages(sb, exception); + return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception)); + } + + return Mono.just(content); + } + + /** + * tries to parse an {@link ElasticsearchException} from the given body content + * + * @param content the content to analyse + * @param mediaType the type of the body content + * @return an {@link ElasticsearchException} or {@literal null}. + */ @Nullable - private ElasticsearchException getElasticsearchException(ClientResponse response, String content, String mediaType) - throws IOException { + private static ElasticsearchException getElasticsearchException(String content, String mediaType, RestStatus status) { - XContentParser parser = createParser(mediaType, content); - // we have a JSON object with an error and a status field - XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT + try { + XContentParser parser = createParser(mediaType, content); + // we have a JSON object with an error and a status field + XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT - do { - token = parser.nextToken(); + do { + token = parser.nextToken(); - if (parser.currentName().equals("error")) { - return ElasticsearchException.failureFromXContent(parser); - } - } while (token == XContentParser.Token.FIELD_NAME); - return null; + if (parser.currentName().equals("error")) { + return ElasticsearchException.failureFromXContent(parser); + } + } while (token == XContentParser.Token.FIELD_NAME); + + return null; + } catch (IOException e) { + return new ElasticsearchStatusException(content, status); + } } private static void buildExceptionMessages(StringBuilder sb, Throwable t) {