mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-05-31 09:12:11 +00:00
parent
384e52b2c3
commit
3c44a1c969
@ -804,53 +804,82 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
|
||||
private <T> Publisher<? extends T> handleServerError(Request request, ClientResponse response) {
|
||||
|
||||
RestStatus status = RestStatus.fromCode(response.statusCode().value());
|
||||
int statusCode = response.statusCode().value();
|
||||
RestStatus status = RestStatus.fromCode(statusCode);
|
||||
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
|
||||
|
||||
return Mono.error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.",
|
||||
request.getMethod(), request.getEndpoint(), response.statusCode().value()), status));
|
||||
return response.body(BodyExtractors.toMono(byte[].class)) //
|
||||
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
|
||||
.flatMap(content -> contentOrError(content, mediaType, status))
|
||||
.flatMap(unused -> Mono
|
||||
.error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.",
|
||||
request.getMethod(), request.getEndpoint(), statusCode), status)));
|
||||
}
|
||||
|
||||
private <T> Publisher<? extends T> handleClientError(String logId, Request request, ClientResponse response,
|
||||
Class<T> responseType) {
|
||||
|
||||
int statusCode = response.statusCode().value();
|
||||
RestStatus status = RestStatus.fromCode(statusCode);
|
||||
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
|
||||
|
||||
return response.body(BodyExtractors.toMono(byte[].class)) //
|
||||
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
|
||||
.flatMap(content -> {
|
||||
String mediaType = response.headers().contentType().map(MediaType::toString)
|
||||
.orElse(XContentType.JSON.mediaType());
|
||||
RestStatus status = RestStatus.fromCode(response.statusCode().value());
|
||||
try {
|
||||
ElasticsearchException exception = getElasticsearchException(response, content, mediaType);
|
||||
if (exception != null) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
buildExceptionMessages(sb, exception);
|
||||
return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return Mono.error(new ElasticsearchStatusException(content, status));
|
||||
}
|
||||
return Mono.just(content);
|
||||
}).doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
|
||||
.flatMap(content -> contentOrError(content, mediaType, status)) //
|
||||
.doOnNext(content -> ClientLogger.logResponse(logId, response.statusCode(), content)) //
|
||||
.flatMap(content -> doDecode(response, responseType, content));
|
||||
}
|
||||
|
||||
// region ElasticsearchException helper
|
||||
/**
|
||||
* checks if the given content body contains an {@link ElasticsearchException}, if yes it is returned in a Mono.error.
|
||||
* Otherwise the content is returned in the Mono
|
||||
*
|
||||
* @param content the content to analyze
|
||||
* @param mediaType the returned media type
|
||||
* @param status the response status
|
||||
* @return a Mono with the content or an Mono.error
|
||||
*/
|
||||
private static Mono<String> contentOrError(String content, String mediaType, RestStatus status) {
|
||||
|
||||
ElasticsearchException exception = getElasticsearchException(content, mediaType, status);
|
||||
|
||||
if (exception != null) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
buildExceptionMessages(sb, exception);
|
||||
return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception));
|
||||
}
|
||||
|
||||
return Mono.just(content);
|
||||
}
|
||||
|
||||
/**
|
||||
* tries to parse an {@link ElasticsearchException} from the given body content
|
||||
*
|
||||
* @param content the content to analyse
|
||||
* @param mediaType the type of the body content
|
||||
* @return an {@link ElasticsearchException} or {@literal null}.
|
||||
*/
|
||||
@Nullable
|
||||
private ElasticsearchException getElasticsearchException(ClientResponse response, String content, String mediaType)
|
||||
throws IOException {
|
||||
private static ElasticsearchException getElasticsearchException(String content, String mediaType, RestStatus status) {
|
||||
|
||||
XContentParser parser = createParser(mediaType, content);
|
||||
// we have a JSON object with an error and a status field
|
||||
XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT
|
||||
try {
|
||||
XContentParser parser = createParser(mediaType, content);
|
||||
// we have a JSON object with an error and a status field
|
||||
XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT
|
||||
|
||||
do {
|
||||
token = parser.nextToken();
|
||||
do {
|
||||
token = parser.nextToken();
|
||||
|
||||
if (parser.currentName().equals("error")) {
|
||||
return ElasticsearchException.failureFromXContent(parser);
|
||||
}
|
||||
} while (token == XContentParser.Token.FIELD_NAME);
|
||||
return null;
|
||||
if (parser.currentName().equals("error")) {
|
||||
return ElasticsearchException.failureFromXContent(parser);
|
||||
}
|
||||
} while (token == XContentParser.Token.FIELD_NAME);
|
||||
|
||||
return null;
|
||||
} catch (IOException e) {
|
||||
return new ElasticsearchStatusException(content, status);
|
||||
}
|
||||
}
|
||||
|
||||
private static void buildExceptionMessages(StringBuilder sb, Throwable t) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user