Fix code not terminating on repository saving an empty flux.

Original Pull Request #3099
Closes: #3039

Signed-off-by: Peter-Josef Meisch <pj.meisch@sothawo.com>
(cherry picked from commit a07ac3c93d6cf05b4713620877c4d45958b7d514)
This commit is contained in:
Peter-Josef Meisch 2025-04-26 10:40:31 +02:00
parent 2c20671379
commit 0e8401d3b1
No known key found for this signature in database
GPG Key ID: DE108246970C7708
2 changed files with 26 additions and 0 deletions

View File

@ -233,6 +233,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
.subscribe(new Subscriber<>() {
@Nullable private Subscription subscription = null;
private final AtomicBoolean upstreamComplete = new AtomicBoolean(false);
private final AtomicBoolean onNextHasBeenCalled = new AtomicBoolean(false);
@Override
public void onSubscribe(Subscription subscription) {
@ -242,6 +243,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
@Override
public void onNext(List<T> entityList) {
onNextHasBeenCalled.set(true);
saveAll(entityList, index)
.map(sink::tryEmitNext)
.doOnComplete(() -> {
@ -267,6 +269,10 @@ abstract public class AbstractReactiveElasticsearchTemplate
@Override
public void onComplete() {
upstreamComplete.set(true);
if (!onNextHasBeenCalled.get()) {
// this happens when an empty flux is saved
sink.tryEmitComplete();
}
}
});
return sink.asFlux();

View File

@ -105,6 +105,26 @@ abstract class SimpleReactiveElasticsearchRepositoryIntegrationTests {
return operations.exists(id, IndexCoordinates.of(indexNameProvider.indexName()));
}
@Test // #3093
@DisplayName("should save all from empty collection")
void shouldSaveAllFromEmptyCollection() {
repository.saveAll(Collections.emptyList())
.as(StepVerifier::create)
.expectNextCount(0)
.verifyComplete();
}
@Test // #3093
@DisplayName("should save all from empty flux")
void shouldSaveAllFromEmptyFlux() {
repository.saveAll(Flux.empty())
.as(StepVerifier::create)
.expectNextCount(0)
.verifyComplete();
}
@Test // DATAES-519
void saveShouldComputeMultipleEntities() {