Expose search shard statistics in search hits.

Original Pull Request #2806
Closes #2605
This commit is contained in:
puppylpg 2023-12-28 19:57:44 +08:00 committed by GitHub
parent 6350514e7e
commit 433d52981e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 266 additions and 33 deletions

View File

@ -49,6 +49,7 @@ import org.springframework.util.Assert;
* {@link org.springframework.data.elasticsearch.core.document.Document}
*
* @author Peter-Josef Meisch
* @author Haibo Liu
* @since 4.4
*/
final class DocumentAdapters {
@ -73,7 +74,7 @@ final class DocumentAdapters {
Map<String, SearchDocumentResponse> innerHits = new LinkedHashMap<>();
hit.innerHits().forEach((name, innerHitsResult) -> {
// noinspection ReturnOfNull
innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null, null,
innerHits.put(name, SearchDocumentResponseBuilder.from(innerHitsResult.hits(), null, null, null, null, null,
searchDocument -> null, jsonpMapper));
});

View File

@ -541,7 +541,7 @@ class ResponseConverter {
}
@Nullable
private static ElasticsearchErrorCause toErrorCause(@Nullable ErrorCause errorCause) {
static ElasticsearchErrorCause toErrorCause(@Nullable ErrorCause errorCause) {
if (errorCause != null) {
return new ElasticsearchErrorCause( //

View File

@ -15,6 +15,8 @@
*/
package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.ShardFailure;
import co.elastic.clients.elasticsearch._types.ShardStatistics;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.SearchTemplateResponse;
@ -36,6 +38,7 @@ import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.data.elasticsearch.core.SearchShardStatistics;
import org.springframework.data.elasticsearch.core.TotalHitsRelation;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
@ -52,6 +55,7 @@ import org.springframework.util.CollectionUtils;
* Factory class to create {@link SearchDocumentResponse} instances.
*
* @author Peter-Josef Meisch
* @author Haibo Liu
* @since 4.4
*/
class SearchDocumentResponseBuilder {
@ -78,8 +82,9 @@ class SearchDocumentResponseBuilder {
Map<String, Aggregate> aggregations = responseBody.aggregations();
Map<String, List<Suggestion<EntityAsMap>>> suggest = responseBody.suggest();
var pointInTimeId = responseBody.pitId();
var shards = responseBody.shards();
return from(hitsMetadata, scrollId, pointInTimeId, aggregations, suggest, entityCreator, jsonpMapper);
return from(hitsMetadata, shards, scrollId, pointInTimeId, aggregations, suggest, entityCreator, jsonpMapper);
}
/**
@ -98,13 +103,14 @@ class SearchDocumentResponseBuilder {
Assert.notNull(entityCreator, "entityCreator must not be null");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");
var shards = response.shards();
var hitsMetadata = response.hits();
var scrollId = response.scrollId();
var aggregations = response.aggregations();
var suggest = response.suggest();
var pointInTimeId = response.pitId();
return from(hitsMetadata, scrollId, pointInTimeId, aggregations, suggest, entityCreator, jsonpMapper);
return from(hitsMetadata, shards, scrollId, pointInTimeId, aggregations, suggest, entityCreator, jsonpMapper);
}
/**
@ -120,8 +126,8 @@ class SearchDocumentResponseBuilder {
* @param jsonpMapper to map JsonData objects
* @return the {@link SearchDocumentResponse}
*/
public static <T> SearchDocumentResponse from(HitsMetadata<?> hitsMetadata, @Nullable String scrollId,
@Nullable String pointInTimeId, @Nullable Map<String, Aggregate> aggregations,
public static <T> SearchDocumentResponse from(HitsMetadata<?> hitsMetadata, @Nullable ShardStatistics shards,
@Nullable String scrollId, @Nullable String pointInTimeId, @Nullable Map<String, Aggregate> aggregations,
Map<String, List<Suggestion<EntityAsMap>>> suggestES, SearchDocumentResponse.EntityCreator<T> entityCreator,
JsonpMapper jsonpMapper) {
@ -155,8 +161,19 @@ class SearchDocumentResponseBuilder {
Suggest suggest = suggestFrom(suggestES, entityCreator);
SearchShardStatistics shardStatistics = shards != null ? shardsFrom(shards) : null;
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchDocuments,
aggregationsContainer, suggest);
aggregationsContainer, suggest, shardStatistics);
}
private static SearchShardStatistics shardsFrom(ShardStatistics shards) {
List<ShardFailure> failures = shards.failures();
List<SearchShardStatistics.Failure> searchFailures = failures.stream()
.map(f -> SearchShardStatistics.Failure.of(f.index(), f.node(), f.status(), f.shard(), null,
ResponseConverter.toErrorCause(f.reason())))
.toList();
return SearchShardStatistics.of(shards.failed(), shards.successful(), shards.total(), shards.skipped(), searchFailures);
}
@Nullable

View File

@ -46,6 +46,7 @@ import org.springframework.util.Assert;
* @author Matt Gilene
* @author Sascha Woo
* @author Jakob Hoeper
* @author Haibo Liu
* @since 4.0
*/
public class SearchHitMapping<T> {
@ -84,6 +85,7 @@ public class SearchHitMapping<T> {
"Count of documents must match the count of entities");
long totalHits = searchDocumentResponse.getTotalHits();
SearchShardStatistics shardStatistics = searchDocumentResponse.getSearchShardStatistics();
float maxScore = searchDocumentResponse.getMaxScore();
String scrollId = searchDocumentResponse.getScrollId();
String pointInTimeId = searchDocumentResponse.getPointInTimeId();
@ -103,7 +105,7 @@ public class SearchHitMapping<T> {
mapHitsInCompletionSuggestion(suggest);
return new SearchHitsImpl<>(totalHits, totalHitsRelation, maxScore, scrollId, pointInTimeId, searchHits,
aggregations, suggest);
aggregations, suggest, shardStatistics);
}
@SuppressWarnings("unchecked")
@ -240,7 +242,8 @@ public class SearchHitMapping<T> {
searchHits.getPointInTimeId(), //
convertedSearchHits, //
searchHits.getAggregations(), //
searchHits.getSuggest());
searchHits.getSuggest(),
searchHits.getSearchShardStatistics());
}
} catch (Exception e) {
throw new UncategorizedElasticsearchException("Unable to convert inner hits.", e);

View File

@ -27,6 +27,7 @@ import org.springframework.lang.Nullable;
*
* @param <T> the result data class.
* @author Sascha Woo
* @author Haibo Liu
* @since 4.0
*/
public interface SearchHits<T> extends Streamable<SearchHit<T>> {
@ -108,4 +109,10 @@ public interface SearchHits<T> extends Streamable<SearchHit<T>> {
*/
@Nullable
String getPointInTimeId();
/**
* @return shard statistics for the search hit.
*/
@Nullable
SearchShardStatistics getSearchShardStatistics();
}

View File

@ -29,6 +29,7 @@ import org.springframework.util.Assert;
* @param <T> the result data class.
* @author Peter-Josef Meisch
* @author Sascha Woo
* @author Haibo Liu
* @since 4.0
*/
public class SearchHitsImpl<T> implements SearchScrollHits<T> {
@ -42,6 +43,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
@Nullable private final AggregationsContainer<?> aggregations;
@Nullable private final Suggest suggest;
@Nullable private String pointInTimeId;
@Nullable private final SearchShardStatistics searchShardStatistics;
/**
* @param totalHits the number of total hits for the search
@ -53,7 +55,8 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
*/
public SearchHitsImpl(long totalHits, TotalHitsRelation totalHitsRelation, float maxScore, @Nullable String scrollId,
@Nullable String pointInTimeId, List<? extends SearchHit<T>> searchHits,
@Nullable AggregationsContainer<?> aggregations, @Nullable Suggest suggest) {
@Nullable AggregationsContainer<?> aggregations, @Nullable Suggest suggest,
@Nullable SearchShardStatistics searchShardStatistics) {
Assert.notNull(searchHits, "searchHits must not be null");
@ -66,6 +69,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
this.aggregations = aggregations;
this.suggest = suggest;
this.unmodifiableSearchHits = Lazy.of(() -> Collections.unmodifiableList(searchHits));
this.searchShardStatistics = searchShardStatistics;
}
// region getter
@ -118,6 +122,11 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
return pointInTimeId;
}
@Override
public SearchShardStatistics getSearchShardStatistics() {
return searchShardStatistics;
}
@Override
public String toString() {
return "SearchHits{" + //
@ -128,6 +137,7 @@ public class SearchHitsImpl<T> implements SearchScrollHits<T> {
", pointInTimeId='" + pointInTimeId + '\'' + //
", searchHits={" + searchHits.size() + " elements}" + //
", aggregations=" + aggregations + //
", shardStatistics=" + searchShardStatistics + //
'}';
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import java.util.List;
import org.springframework.data.elasticsearch.ElasticsearchErrorCause;
import org.springframework.lang.Nullable;
/**
* @author Haibo Liu
* @since 5.3
*/
public class SearchShardStatistics {
private final Number failed;
private final Number successful;
private final Number total;
@Nullable private final Number skipped;
private final List<Failure> failures;
private SearchShardStatistics(Number failed, Number successful, Number total, @Nullable Number skipped,
List<Failure> failures) {
this.failed = failed;
this.successful = successful;
this.total = total;
this.skipped = skipped;
this.failures = failures;
}
public static SearchShardStatistics of(Number failed, Number successful, Number total, @Nullable Number skipped,
List<Failure> failures) {
return new SearchShardStatistics(failed, successful, total, skipped, failures);
}
public Number getFailed() {
return failed;
}
public Number getSuccessful() {
return successful;
}
public Number getTotal() {
return total;
}
@Nullable
public Number getSkipped() {
return skipped;
}
public boolean isFailed() {
return failed.intValue() > 0;
}
public List<Failure> getFailures() {
return failures;
}
public static class Failure {
@Nullable private final String index;
@Nullable private final String node;
@Nullable private final String status;
private final int shard;
@Nullable private final Exception exception;
@Nullable private final ElasticsearchErrorCause elasticsearchErrorCause;
private Failure(@Nullable String index, @Nullable String node, @Nullable String status, int shard,
@Nullable Exception exception, @Nullable ElasticsearchErrorCause elasticsearchErrorCause) {
this.index = index;
this.node = node;
this.status = status;
this.shard = shard;
this.exception = exception;
this.elasticsearchErrorCause = elasticsearchErrorCause;
}
public static SearchShardStatistics.Failure of(@Nullable String index, @Nullable String node,
@Nullable String status, int shard, @Nullable Exception exception,
@Nullable ElasticsearchErrorCause elasticsearchErrorCause) {
return new SearchShardStatistics.Failure(index, node, status, shard, exception, elasticsearchErrorCause);
}
@Nullable
public String getIndex() {
return index;
}
@Nullable
public String getNode() {
return node;
}
@Nullable
public String getStatus() {
return status;
}
@Nullable
public Exception getException() {
return exception;
}
public int getShard() {
return shard;
}
@Nullable
public ElasticsearchErrorCause getElasticsearchErrorCause() {
return elasticsearchErrorCause;
}
}
}

View File

@ -129,7 +129,7 @@ public class ElasticsearchCustomConversions extends CustomConversions {
@WritingConverter
enum ByteArrayToBase64Converter implements Converter<byte[], String> {
INSTANCE,;
INSTANCE;
@Override
public String convert(byte[] source) {

View File

@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.springframework.data.elasticsearch.core.AggregationsContainer;
import org.springframework.data.elasticsearch.core.SearchShardStatistics;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
import org.springframework.lang.Nullable;
@ -27,6 +28,7 @@ import org.springframework.lang.Nullable;
* This represents the complete search response from Elasticsearch, including the returned documents.
*
* @author Peter-Josef Meisch
* @author Haibo Liu
* @since 4.0
*/
public class SearchDocumentResponse {
@ -40,10 +42,12 @@ public class SearchDocumentResponse {
@Nullable private final Suggest suggest;
@Nullable String pointInTimeId;
@Nullable private final SearchShardStatistics searchShardStatistics;
public SearchDocumentResponse(long totalHits, String totalHitsRelation, float maxScore, @Nullable String scrollId,
@Nullable String pointInTimeId, List<SearchDocument> searchDocuments,
@Nullable AggregationsContainer<?> aggregationsContainer, @Nullable Suggest suggest) {
@Nullable AggregationsContainer<?> aggregationsContainer, @Nullable Suggest suggest,
@Nullable SearchShardStatistics searchShardStatistics) {
this.totalHits = totalHits;
this.totalHitsRelation = totalHitsRelation;
this.maxScore = maxScore;
@ -52,6 +56,7 @@ public class SearchDocumentResponse {
this.searchDocuments = searchDocuments;
this.aggregations = aggregationsContainer;
this.suggest = suggest;
this.searchShardStatistics = searchShardStatistics;
}
public long getTotalHits() {
@ -93,6 +98,11 @@ public class SearchDocumentResponse {
return pointInTimeId;
}
@Nullable
public SearchShardStatistics getSearchShardStatistics() {
return searchShardStatistics;
}
/**
* A function to convert a {@link SearchDocument} async into an entity. Asynchronous so that it can be used from the
* imperative and the reactive code.

View File

@ -15,14 +15,10 @@
*/
package org.springframework.data.elasticsearch.repository.query;
import java.util.Collections;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.SearchHitSupport;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchHitsImpl;
import org.springframework.data.elasticsearch.core.TotalHitsRelation;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BaseQuery;
@ -42,6 +38,7 @@ import org.springframework.util.ClassUtils;
* @author Rizwan Idrees
* @author Mohsin Husen
* @author Peter-Josef Meisch
* @author Haibo Liu
*/
public abstract class AbstractElasticsearchRepositoryQuery implements RepositoryQuery {
@ -107,24 +104,13 @@ public abstract class AbstractElasticsearchRepositoryQuery implements Repository
: PageRequest.of(0, DEFAULT_STREAM_BATCH_SIZE));
result = StreamUtils.createStreamFromIterator(elasticsearchOperations.searchForStream(query, clazz, index));
} else if (queryMethod.isCollectionQuery()) {
if (parameterAccessor.getPageable().isUnpaged()) {
int itemCount = (int) elasticsearchOperations.count(query, clazz, index);
if (itemCount == 0) {
result = new SearchHitsImpl<>(0, TotalHitsRelation.EQUAL_TO, Float.NaN, null,
query.getPointInTime() != null ? query.getPointInTime().id() : null, Collections.emptyList(), null, null);
} else {
query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
}
} else {
query.setPageable(parameterAccessor.getPageable());
}
if (result == null) {
result = elasticsearchOperations.search(query, clazz, index);
}
} else {
result = elasticsearchOperations.searchOne(query, clazz, index);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2013-2023 the original author or authors.
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -15,26 +15,35 @@
*/
package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.ShardFailure;
import co.elastic.clients.elasticsearch._types.ShardStatistics;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.elasticsearch.core.search.Suggestion;
import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.SoftAssertions;
import org.json.JSONException;
import org.junit.jupiter.api.Test;
import org.springframework.data.elasticsearch.ElasticsearchErrorCause;
import org.springframework.data.elasticsearch.core.SearchShardStatistics;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import static org.assertj.core.api.Assertions.*;
/**
* Tests for the factory class to create {@link SearchDocumentResponse} instances.
*
* @author Sébastien Comeau
* @author Haibo Liu
* @since 5.2
*/
class SearchDocumentResponseBuilderUnitTests {
@ -73,7 +82,7 @@ class SearchDocumentResponseBuilderUnitTests {
.build();
// act
final var actual = SearchDocumentResponseBuilder.from(hitsMetadata, null, null, null, sortProperties, null,
final var actual = SearchDocumentResponseBuilder.from(hitsMetadata, null, null, null, null, sortProperties, null,
jsonpMapper);
// assert
@ -108,4 +117,62 @@ class SearchDocumentResponseBuilderUnitTests {
softly.assertAll();
}
@Test // #2605
void shouldGetShardStatisticsInfo() {
// arrange
HitsMetadata<EntityAsMap> hitsMetadata = new HitsMetadata.Builder<EntityAsMap>()
.total(t -> t
.value(0)
.relation(TotalHitsRelation.Eq))
.hits(new ArrayList<>())
.build();
ShardStatistics shards = new ShardStatistics.Builder()
.total(15)
.successful(14)
.skipped(0)
.failed(1)
.failures(List.of(
ShardFailure.of(sfb -> sfb
.index("test-index")
.node("test-node")
.shard(1)
.reason(rb -> rb
.reason("this is a mock failure in shards")
.causedBy(cbb ->
cbb.reason("inner reason")
.metadata(Map.of("hello", JsonData.of("world")))
)
.type("reason-type")
)
.status("fail")
)
))
.build();
// act
SearchDocumentResponse response = SearchDocumentResponseBuilder.from(hitsMetadata, shards, null, null,
null, null, null, jsonpMapper);
// assert
SearchShardStatistics shardStatistics = response.getSearchShardStatistics();
assertThat(shardStatistics).isNotNull();
assertThat(shardStatistics.getTotal()).isEqualTo(15);
assertThat(shardStatistics.getSuccessful()).isEqualTo(14);
assertThat(shardStatistics.getSkipped()).isEqualTo(0);
assertThat(shardStatistics.getFailed()).isEqualTo(1);
// assert failure
List<SearchShardStatistics.Failure> failures = shardStatistics.getFailures();
assertThat(failures.size()).isEqualTo(1);
assertThat(failures).extracting(SearchShardStatistics.Failure::getIndex).containsExactly("test-index");
assertThat(failures).extracting(SearchShardStatistics.Failure::getElasticsearchErrorCause)
.extracting(ElasticsearchErrorCause::getReason)
.containsExactly("this is a mock failure in shards");
assertThat(failures).extracting(SearchShardStatistics.Failure::getElasticsearchErrorCause)
.extracting(ElasticsearchErrorCause::getCausedBy)
.extracting(ElasticsearchErrorCause::getReason)
.containsExactly("inner reason");
}
}

View File

@ -32,6 +32,7 @@ import org.springframework.data.util.CloseableIterator;
/**
* @author Roman Puchkovskiy
* @author Peter-Josef Meisch
* @author Haibo Liu
*/
class SearchHitSupportTest {
@ -65,7 +66,7 @@ class SearchHitSupportTest {
hits.add(new SearchHit<>(null, null, null, 0, null, null, null, null, null, null, "five"));
SearchHits<String> originalSearchHits = new SearchHitsImpl<>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, "scroll",
null, hits, null, null);
null, hits, null, null, null);
SearchPage<String> searchPage = SearchHitSupport.searchPageFor(originalSearchHits, PageRequest.of(0, 3));
SearchHits<String> searchHits = searchPage.getSearchHits();

View File

@ -31,6 +31,7 @@ import org.springframework.data.util.StreamUtils;
/**
* @author Sascha Woo
* @author Peter-Josef Meisch
* @author Haibo Liu
*/
public class StreamQueriesTest {
@ -180,6 +181,6 @@ public class StreamQueriesTest {
}
private SearchScrollHits<String> newSearchScrollHits(List<SearchHit<String>> hits, String scrollId) {
return new SearchHitsImpl<>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, scrollId, null, hits, null, null);
return new SearchHitsImpl<>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, scrollId, null, hits, null, null, null);
}
}