From 834b10f57813d9c0fd9875284d2e92b3f8aa7cb5 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sun, 20 Feb 2022 12:26:19 +0100 Subject: [PATCH] Remove blocking code in SearchDocument processing. Original Pull Request #2094 Closes #2025 (cherry picked from commit c1a1ea9724a9cd6590758895281261bc01a272ab) --- ...actElasticsearchRestTransportTemplate.java | 12 +++-- .../core/ElasticsearchRestTemplate.java | 10 ++-- .../core/ElasticsearchTemplate.java | 6 +-- .../core/ReactiveElasticsearchTemplate.java | 21 ++++---- .../core/document/SearchDocumentResponse.java | 48 ++++++++++++++----- ...searchTemplateSuggestIntegrationTests.java | 37 ++++++++------ 6 files changed, 88 insertions(+), 46 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java index cdaf7c36f..a4e82c8c3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -109,7 +110,8 @@ public abstract class AbstractElasticsearchRestTransportTemplate extends Abstrac List> res = new ArrayList<>(queries.size()); int c = 0; for (Query query : queries) { - res.add(callback.doWith(SearchDocumentResponse.from(items[c++].getResponse(), documentCallback::doWith))); + res.add( + callback.doWith(SearchDocumentResponse.from(items[c++].getResponse(), getEntityCreator(documentCallback)))); } return res; } @@ -142,7 +144,7 @@ public abstract class AbstractElasticsearchRestTransportTemplate extends Abstrac index); SearchResponse response = items[c++].getResponse(); - res.add(callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith))); + res.add(callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)))); } return res; } @@ -175,7 +177,7 @@ public abstract class AbstractElasticsearchRestTransportTemplate extends Abstrac index); SearchResponse response = items[c++].getResponse(); - res.add(callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith))); + res.add(callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)))); } return res; } @@ -215,5 +217,9 @@ public abstract class AbstractElasticsearchRestTransportTemplate extends Abstrac return suggest(suggestion, getIndexCoordinatesFor(clazz)); } + protected SearchDocumentResponse.EntityCreator getEntityCreator(ReadDocumentCallback documentCallback) { + return searchDocument -> CompletableFuture.completedFuture(documentCallback.doWith(searchDocument)); + } + // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 1d73a2b72..c7a852088 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2021 the original author or authors. + * Copyright 2013-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -319,7 +319,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchRestTranspor ReadDocumentCallback documentCallback = new ReadDocumentCallback(elasticsearchConverter, clazz, index); SearchDocumentResponseCallback> callback = new ReadSearchDocumentResponseCallback<>(clazz, index); - return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)); + return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))); } @Override @@ -336,7 +336,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchRestTranspor ReadDocumentCallback documentCallback = new ReadDocumentCallback(elasticsearchConverter, clazz, index); SearchDocumentResponseCallback> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz, index); - return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)); + return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))); } @Override @@ -351,7 +351,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchRestTranspor ReadDocumentCallback documentCallback = new ReadDocumentCallback(elasticsearchConverter, clazz, index); SearchDocumentResponseCallback> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz, index); - return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)); + return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))); } @Override @@ -378,8 +378,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchRestTranspor Assert.isTrue(items.length == request.requests().size(), "Response should has same length with queries"); return items; } - // endregion + // endregion // region ClientCallback /** * Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index c58d3f4c6..0f84a1f33 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -354,7 +354,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchRestTransportTem ReadDocumentCallback documentCallback = new ReadDocumentCallback(elasticsearchConverter, clazz, index); SearchDocumentResponseCallback> callback = new ReadSearchDocumentResponseCallback<>(clazz, index); - return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)); + return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))); } @Override @@ -372,7 +372,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchRestTransportTem ReadDocumentCallback documentCallback = new ReadDocumentCallback(elasticsearchConverter, clazz, index); SearchDocumentResponseCallback> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz, index); - return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)); + return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))); } @Override @@ -389,7 +389,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchRestTransportTem ReadDocumentCallback documentCallback = new ReadDocumentCallback(elasticsearchConverter, clazz, index); SearchDocumentResponseCallback> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz, index); - return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)); + return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))); } @Override diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 131711483..af6d7db63 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import org.elasticsearch.Version; @@ -771,15 +770,18 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera }); } - private Mono doFindForResponse(Query query, Class clazz, IndexCoordinates index) { + private Mono doFindForResponse(Query query, Class clazz, IndexCoordinates index) { return Mono.defer(() -> { SearchRequest request = requestFactory.searchRequest(query, clazz, index); request = prepareSearchRequest(request, false); SearchDocumentCallback documentCallback = new ReadSearchDocumentCallback<>(clazz, index); + // noinspection unchecked + SearchDocumentResponse.EntityCreator entityCreator = searchDocument -> ((Mono) documentCallback + .toEntity(searchDocument)).toFuture(); - return doFindForResponse(request, searchDocument -> documentCallback.toEntity(searchDocument).block()); + return doFindForResponse(request, entityCreator); }); } @@ -896,19 +898,18 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera * Customization hook on the actual execution result {@link Mono}.
* * @param request the already prepared {@link SearchRequest} ready to be executed. - * @param suggestEntityCreator + * @param entityCreator * @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}. */ - protected Mono doFindForResponse(SearchRequest request, - Function suggestEntityCreator) { + protected Mono doFindForResponse(SearchRequest request, + SearchDocumentResponse.EntityCreator entityCreator) { if (QUERY_LOGGER.isDebugEnabled()) { QUERY_LOGGER.debug("Executing doFindForResponse: {}", request); } - return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(searchResponse -> { - return SearchDocumentResponse.from(searchResponse, suggestEntityCreator); - }); + return Mono.from(execute(client -> client.searchForResponse(request))) + .map(searchResponse -> SearchDocumentResponse.from(searchResponse, entityCreator)); } /** diff --git a/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java index f45f425c5..555f7f316 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,11 @@ package org.springframework.data.elasticsearch.core.document; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.text.Text; @@ -38,13 +41,15 @@ import org.springframework.util.Assert; /** * This represents the complete search response from Elasticsearch, including the returned documents. Instances must be - * created with the {@link #from(SearchResponse,Function)} method. + * created with the {@link #from(SearchResponse, EntityCreator)} method. * * @author Peter-Josef Meisch * @since 4.0 */ public class SearchDocumentResponse { + private static final Log LOGGER = LogFactory.getLog(SearchDocumentResponse.class); + private final long totalHits; private final String totalHitsRelation; private final float maxScore; @@ -98,12 +103,11 @@ public class SearchDocumentResponse { * creates a SearchDocumentResponse from the {@link SearchResponse} * * @param searchResponse must not be {@literal null} - * @param suggestEntityCreator function to create an entity from a {@link SearchDocument} + * @param entityCreator function to create an entity from a {@link SearchDocument} * @param entity type * @return the SearchDocumentResponse */ - public static SearchDocumentResponse from(SearchResponse searchResponse, - Function suggestEntityCreator) { + public static SearchDocumentResponse from(SearchResponse searchResponse, EntityCreator entityCreator) { Assert.notNull(searchResponse, "searchResponse must not be null"); @@ -112,7 +116,7 @@ public class SearchDocumentResponse { Aggregations aggregations = searchResponse.getAggregations(); org.elasticsearch.search.suggest.Suggest suggest = searchResponse.getSuggest(); - return from(searchHits, scrollId, aggregations, suggest, suggestEntityCreator); + return from(searchHits, scrollId, aggregations, suggest, entityCreator); } /** @@ -122,14 +126,14 @@ public class SearchDocumentResponse { * @param scrollId scrollId * @param aggregations aggregations * @param suggestES the suggestion response from Elasticsearch - * @param suggestEntityCreator function to create an entity from a {@link SearchDocument} + * @param entityCreator function to create an entity from a {@link SearchDocument} * @param entity type * @return the {@link SearchDocumentResponse} * @since 4.3 */ public static SearchDocumentResponse from(SearchHits searchHits, @Nullable String scrollId, @Nullable Aggregations aggregations, @Nullable org.elasticsearch.search.suggest.Suggest suggestES, - Function suggestEntityCreator) { + EntityCreator entityCreator) { TotalHits responseTotalHits = searchHits.getTotalHits(); @@ -153,14 +157,14 @@ public class SearchDocumentResponse { } } - Suggest suggest = suggestFrom(suggestES, suggestEntityCreator); + Suggest suggest = suggestFrom(suggestES, entityCreator); return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations, suggest); } @Nullable private static Suggest suggestFrom(@Nullable org.elasticsearch.search.suggest.Suggest suggestES, - Function entityCreator) { + EntityCreator entityCreator) { if (suggestES == null) { return null; @@ -219,7 +223,19 @@ public class SearchDocumentResponse { List> options = new ArrayList<>(); for (org.elasticsearch.search.suggest.completion.CompletionSuggestion.Entry.Option optionES : entryES) { SearchDocument searchDocument = optionES.getHit() != null ? DocumentAdapters.from(optionES.getHit()) : null; - T hitEntity = searchDocument != null ? entityCreator.apply(searchDocument) : null; + + T hitEntity = null; + + if (searchDocument != null) { + try { + hitEntity = entityCreator.apply(searchDocument).get(); + } catch (Exception e) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Error creating entity from SearchDocument"); + } + } + } + options.add(new CompletionSuggestion.Entry.Option(textToString(optionES.getText()), textToString(optionES.getHighlighted()), optionES.getScore(), optionES.collateMatch(), optionES.getContexts(), scoreDocFrom(optionES.getDoc()), searchDocument, hitEntity)); @@ -254,4 +270,14 @@ public class SearchDocumentResponse { private static String textToString(@Nullable Text text) { return text != null ? text.string() : ""; } + + /** + * A function to convert a {@link SearchDocument} async into an entity. Asynchronous so that it can be used from the + * imperative and the reactive code. + * + * @param the entity type + */ + @FunctionalInterface + public interface EntityCreator extends Function> {} + } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveElasticsearchTemplateSuggestIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveElasticsearchTemplateSuggestIntegrationTests.java index 64c40390e..08f8df43d 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveElasticsearchTemplateSuggestIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveElasticsearchTemplateSuggestIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core.suggest; import static org.assertj.core.api.Assertions.*; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.util.ArrayList; @@ -39,8 +40,8 @@ import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; +import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.suggest.response.CompletionSuggestion; import org.springframework.data.elasticsearch.core.suggest.response.Suggest; import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration; @@ -86,13 +87,11 @@ public class ReactiveElasticsearchTemplateSuggestIntegrationTests { @DisplayName("should find suggestions for given prefix completion") void shouldFindSuggestionsForGivenPrefixCompletion() { - loadCompletionObjectEntities(); - - NativeSearchQuery query = new NativeSearchQueryBuilder().withSuggestBuilder(new SuggestBuilder() - .addSuggestion("test-suggest", SuggestBuilders.completionSuggestion("suggest").prefix("m", Fuzziness.AUTO))) - .build(); - - operations.suggest(query, CompletionEntity.class) // + loadCompletionObjectEntities() // + .flatMap(unused -> { + Query query = getSuggestQuery("test-suggest", "suggest", "m"); + return operations.suggest(query, CompletionEntity.class); + }) // .as(StepVerifier::create) // .assertNext(suggest -> { Suggest.Suggestion> suggestion = suggest @@ -105,13 +104,21 @@ public class ReactiveElasticsearchTemplateSuggestIntegrationTests { assertThat(options).hasSize(2); assertThat(options.get(0).getText()).isIn("Marchand", "Mohsin"); assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin"); - }) // .verifyComplete(); } + protected Query getSuggestQuery(String suggestionName, String fieldName, String prefix) { + return new NativeSearchQueryBuilder() // + .withSuggestBuilder(new SuggestBuilder() // + .addSuggestion(suggestionName, // + SuggestBuilders.completionSuggestion(fieldName) // + .prefix(prefix, Fuzziness.AUTO))) // + .build(); // + } + // region helper functions - private void loadCompletionObjectEntities() { + private Mono loadCompletionObjectEntities() { CompletionEntity rizwan_idrees = new CompletionEntityBuilder("1").name("Rizwan Idrees") .suggest(new String[] { "Rizwan Idrees" }).build(); @@ -124,7 +131,7 @@ public class ReactiveElasticsearchTemplateSuggestIntegrationTests { List entities = new ArrayList<>( Arrays.asList(rizwan_idrees, franck_marchand, mohsin_husen, artur_konczak)); IndexCoordinates index = IndexCoordinates.of(indexNameProvider.indexName()); - operations.saveAll(entities, index).blockLast(); + return operations.saveAll(entities, index).last(); } // endregion @@ -132,11 +139,13 @@ public class ReactiveElasticsearchTemplateSuggestIntegrationTests { @Document(indexName = "#{@indexNameProvider.indexName()}") static class CompletionEntity { - @Nullable @Id private String id; + @Nullable + @Id private String id; @Nullable private String name; - @Nullable @CompletionField(maxInputLength = 100) private Completion suggest; + @Nullable + @CompletionField(maxInputLength = 100) private Completion suggest; private CompletionEntity() {}