From 797dbb5a18df091c45d9890f424034d2738dfd5e Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sun, 19 Mar 2023 15:01:24 +0100 Subject: [PATCH] Improve save(Flux) implementations. Original Pull Request #2497 Closes #2496 Closes #2492 --- ...AbstractReactiveElasticsearchTemplate.java | 59 +++++++++++++++++++ .../core/ReactiveDocumentOperations.java | 58 ++++++++++++++++++ ...SimpleReactiveElasticsearchRepository.java | 4 +- ...ReactiveElasticsearchIntegrationTests.java | 37 ++++++++++-- .../suggest/CompletionIntegrationTests.java | 3 - .../ReactiveSuggestIntegrationTests.java | 1 - ...asticsearchRepositoryIntegrationTests.java | 25 ++++++++ 7 files changed, 176 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index ff85e88e0..0a77504c8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -17,13 +17,17 @@ package org.springframework.data.elasticsearch.core; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import reactor.util.function.Tuple2; import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -207,6 +211,61 @@ abstract public class AbstractReactiveElasticsearchTemplate return save(entity, getIndexCoordinatesFor(entity.getClass())); } + @Override + public Flux save(Flux entities, Class clazz, int bulkSize) { + return save(entities, getIndexCoordinatesFor(clazz), bulkSize); + } + + @Override + public Flux save(Flux entities, IndexCoordinates index, int bulkSize) { + + Assert.notNull(entities, "entities must not be null"); + Assert.notNull(index, "index must not be null"); + Assert.isTrue(bulkSize > 0, "bulkSize must be greater than 0"); + + return Flux.defer(() -> { + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); + entities // + .bufferTimeout(bulkSize, Duration.ofMillis(200)) // + .subscribe(new Subscriber>() { + private Subscription subscription; + private AtomicBoolean upstreamComplete = new AtomicBoolean(false); + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(List entityList) { + saveAll(entityList, index) // + .map(sink::tryEmitNext) // + .doOnComplete(() -> { + if (!upstreamComplete.get()) { + subscription.request(1); + } else { + sink.tryEmitComplete(); + } + }).subscribe(); + } + + @Override + public void onError(Throwable throwable) { + subscription.cancel(); + sink.tryEmitError(throwable); + } + + @Override + public void onComplete() { + upstreamComplete.set(true); + } + }); + return sink.asFlux(); + }); + + } + @Override public Flux saveAll(Mono> entities, Class clazz) { return saveAll(entities, getIndexCoordinatesFor(clazz)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 3e24f4c37..65aec898d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -44,6 +44,9 @@ import org.springframework.util.Assert; * @since 4.0 */ public interface ReactiveDocumentOperations { + + int FLUX_SAVE_BULK_SIZE = 500; + /** * Index the given entity, once available, extracting index from entity metadata. * @@ -93,6 +96,61 @@ public interface ReactiveDocumentOperations { */ Mono save(T entity, IndexCoordinates index); + /** + * Indexes the entities into the index extracted from entity metadata. + * + * @param entities + * @param clazz the class to get the index name from + * @param entity type + * @return a Flux emitting the saved entities + * @since 5.1 + */ + default Flux save(Flux entities, Class clazz) { + return save(entities, clazz, FLUX_SAVE_BULK_SIZE); + } + + /** + * Indexes the entities into the index extracted from entity metadata. The entities are collected into batches of + * {bulkSize} with a maximal timeout of 200 ms, see + * {@link reactor.core.publisher.Flux#bufferTimeout(int, java.time .Duration)} and then sent in a bulk operation to + * Elasticsearch. + * + * @param entities + * @param clazz the class to get the index name from + * @param bulkSize number of entities to put in a bulk request + * @param entity type + * @return a Flux emitting the saved entities + * @since 5.1 + */ + Flux save(Flux entities, Class clazz, int bulkSize); + + /** + * Indexes the entities into the given index. + * + * @param entities the entities to save + * @param index the index to save to + * @param entity type + * @return a Flux emitting the saved entities + * @since 5.1 + */ + default Flux save(Flux entities, IndexCoordinates index) { + return save(entities, index, FLUX_SAVE_BULK_SIZE); + } + + /** + * Indexes the entities into the given index. The entities are collected into batches of {bulkSize} with a maximal + * timeout of 200 ms, see {@link reactor.core.publisher.Flux#bufferTimeout(int, java.time * .Duration)} and then sent + * in a bulk operation to Elasticsearch. + * + * @param entities the entities to save + * @param index the index to save to + * @param bulkSize number of entities to put in a bulk request + * @param entity type + * @return a Flux emitting the saved entities + * @since 5.1 + */ + Flux save(Flux entities, IndexCoordinates index, int bulkSize); + /** * Index entities the index extracted from entity metadata. * diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index 9d9d04eba..b0279e4cf 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -15,7 +15,6 @@ */ package org.springframework.data.elasticsearch.repository.support; -import org.springframework.data.elasticsearch.core.query.BaseQuery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,6 +29,7 @@ import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.BaseQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository; import org.springframework.util.Assert; @@ -97,7 +97,7 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla Assert.notNull(entityStream, "EntityStream must not be null!"); - return operations.saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates()) + return operations.save(Flux.from(entityStream), entityInformation.getIndexCoordinates()) .concatWith(doRefresh().then(Mono.empty())); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java index ab18dd3c2..80712d7b2 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java @@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*; -import org.springframework.data.elasticsearch.annotations.IndexedIndexName; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -61,6 +61,7 @@ import org.springframework.data.elasticsearch.RestStatusException; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; +import org.springframework.data.elasticsearch.annotations.IndexedIndexName; import org.springframework.data.elasticsearch.annotations.Mapping; import org.springframework.data.elasticsearch.annotations.Setting; import org.springframework.data.elasticsearch.annotations.WriteOnlyProperty; @@ -170,7 +171,6 @@ public abstract class ReactiveElasticsearchIntegrationTests { assertThat(saved.getIndexedIndexName()).isEqualTo(indexNameProvider.indexName() + "-indexedindexname"); } - private Mono documentWithIdExistsInIndex(String id, String index) { return operations.exists(id, IndexCoordinates.of(index)); } @@ -1180,6 +1180,25 @@ public abstract class ReactiveElasticsearchIntegrationTests { assertThat(readEntity.getPart2()).isEqualTo(entity.getPart2()); // }).verifyComplete(); } + + @Test // #2496 + @DisplayName("should save data from Flux and return saved data in a flux") + void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() { + + var count = 12_345; + var entityList = IntStream.rangeClosed(1, count)// + .mapToObj(SampleEntity::of) // + .collect(Collectors.toList()); + + var entityFlux = Flux.fromIterable(entityList); + + operations.save(entityFlux, SampleEntity.class).collectList() // + .as(StepVerifier::create) // + .consumeNextWith(savedEntities -> { + assertThat(savedEntities).isEqualTo(entityList); + }) // + .verifyComplete(); + } // endregion // region Helper functions @@ -1262,6 +1281,13 @@ public abstract class ReactiveElasticsearchIntegrationTests { @Nullable @Version private Long version; + static SampleEntity of(int id) { + var entity = new SampleEntity(); + entity.setId("" + id); + entity.setMessage(" message " + id); + return entity; + } + @Nullable public String getId() { return id; @@ -1543,6 +1569,7 @@ public abstract class ReactiveElasticsearchIntegrationTests { this.part2 = part2; } } + @Document(indexName = "#{@indexNameProvider.indexName()}-indexedindexname") private static class IndexedIndexNameEntity { @Nullable @@ -1550,8 +1577,7 @@ public abstract class ReactiveElasticsearchIntegrationTests { @Nullable @Field(type = Text) private String someText; @Nullable - @IndexedIndexName - private String indexedIndexName; + @IndexedIndexName private String indexedIndexName; @Nullable public String getId() { @@ -1579,5 +1605,6 @@ public abstract class ReactiveElasticsearchIntegrationTests { public void setIndexedIndexName(@Nullable String indexedIndexName) { this.indexedIndexName = indexedIndexName; } - } // endregion + } + // endregion } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/suggest/CompletionIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/suggest/CompletionIntegrationTests.java index ca3305d58..4bf70ad05 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/suggest/CompletionIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/suggest/CompletionIntegrationTests.java @@ -112,7 +112,6 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie operations.bulkIndex(indexQueries, AnnotatedCompletionEntity.class); } - // @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150") @Test public void shouldFindSuggestionsForGivenCriteriaQueryUsingCompletionEntity() { @@ -144,7 +143,6 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie operations.get("1", CompletionEntity.class); } - // @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150") @Test public void shouldFindSuggestionsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() { @@ -168,7 +166,6 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin"); } - // @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES 1issue 150") @Test public void shouldFindSuggestionsWithWeightsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() { diff --git a/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveSuggestIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveSuggestIntegrationTests.java index 7e562e38e..c6ef42296 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveSuggestIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveSuggestIntegrationTests.java @@ -66,7 +66,6 @@ public abstract class ReactiveSuggestIntegrationTests implements NewElasticsearc operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete().block(); } - // @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150") @Test // #1302 @DisplayName("should find suggestions for given prefix completion") void shouldFindSuggestionsForGivenPrefixCompletion() { diff --git a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java index 0a8d72f8b..69d64f4e6 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java @@ -739,6 +739,24 @@ abstract class SimpleReactiveElasticsearchRepositoryIntegrationTests { .verifyComplete(); } + @Test // #2496 + @DisplayName("should save data from Flux and return saved data in a flux") + void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() { + var count = 12_345; + var entityList = IntStream.rangeClosed(1, count)// + .mapToObj(SampleEntity::of) // + .collect(Collectors.toList()); + + var entityFlux = Flux.fromIterable(entityList); + + repository.saveAll(entityFlux).collectList() // + .as(StepVerifier::create) // + .consumeNextWith(savedEntities -> { + assertThat(savedEntities).isEqualTo(entityList); + }) // + .verifyComplete(); + } + Mono bulkIndex(SampleEntity... entities) { return operations.saveAll(Arrays.asList(entities), IndexCoordinates.of(indexNameProvider.indexName())).then(); } @@ -829,6 +847,13 @@ abstract class SimpleReactiveElasticsearchRepositoryIntegrationTests { @Field(name = "custom_field_name", type = FieldType.Text) @Nullable private String customFieldNameMessage; + static SampleEntity of(int id) { + var entity = new SampleEntity(); + entity.setId("" + id); + entity.setMessage(" message " + id); + return entity; + } + public SampleEntity() {} public SampleEntity(@Nullable String id) {