From d6b55406142abe2f72d2da29f7180621d0380b08 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Fri, 2 Jun 2023 15:54:57 +0200 Subject: [PATCH] Fix reactive save of Flux. Original Pull Request #2581 Closes #2576 --- ...AbstractReactiveElasticsearchTemplate.java | 64 +++++++++---------- ...ReactiveElasticsearchIntegrationTests.java | 9 ++- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index 0a77504c8..a053edb40 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -225,42 +225,42 @@ abstract public class AbstractReactiveElasticsearchTemplate return Flux.defer(() -> { Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); - entities // - .bufferTimeout(bulkSize, Duration.ofMillis(200)) // - .subscribe(new Subscriber>() { - private Subscription subscription; - private AtomicBoolean upstreamComplete = new AtomicBoolean(false); + entities.window(bulkSize) // + .concatMap(flux -> flux.collectList()) // + .subscribe(new Subscriber>() { + private Subscription subscription; + private AtomicBoolean upstreamComplete = new AtomicBoolean(false); - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } - @Override - public void onNext(List entityList) { - saveAll(entityList, index) // - .map(sink::tryEmitNext) // - .doOnComplete(() -> { - if (!upstreamComplete.get()) { - subscription.request(1); - } else { - sink.tryEmitComplete(); - } - }).subscribe(); - } + @Override + public void onNext(List entityList) { + saveAll(entityList, index) // + .map(sink::tryEmitNext) // + .doOnComplete(() -> { + if (!upstreamComplete.get()) { + subscription.request(1); + } else { + sink.tryEmitComplete(); + } + }).subscribe(); + } - @Override - public void onError(Throwable throwable) { - subscription.cancel(); - sink.tryEmitError(throwable); - } + @Override + public void onError(Throwable throwable) { + subscription.cancel(); + sink.tryEmitError(throwable); + } - @Override - public void onComplete() { - upstreamComplete.set(true); - } - }); + @Override + public void onComplete() { + upstreamComplete.set(true); + } + }); return sink.asFlux(); }); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java index 65ecf6788..33869f5fd 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java @@ -28,6 +28,7 @@ import java.lang.Boolean; import java.lang.Integer; import java.lang.Long; import java.lang.Object; +import java.time.Duration; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -1171,7 +1172,7 @@ public abstract class ReactiveElasticsearchIntegrationTests { }).verifyComplete(); } - @Test // #2496 + @Test // #2496, #2576 @DisplayName("should save data from Flux and return saved data in a flux") void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() { @@ -1180,9 +1181,11 @@ public abstract class ReactiveElasticsearchIntegrationTests { .mapToObj(SampleEntity::of) // .collect(Collectors.toList()); - var entityFlux = Flux.fromIterable(entityList); + // we add a random delay to make suure the underlying implementation handles irregular incoming data + var entities = Flux.fromIterable(entityList).concatMap( + entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity)); - operations.save(entityFlux, SampleEntity.class).collectList() // + operations.save(entities, SampleEntity.class).collectList() // .as(StepVerifier::create) // .consumeNextWith(savedEntities -> { assertThat(savedEntities).isEqualTo(entityList);