mirror of
https://github.com/spring-projects/spring-data-elasticsearch.git
synced 2025-05-31 09:12:11 +00:00
Fix code not terminating on repository saving an empty flux.
Original Pull Request #3099 Closes: #3039 Signed-off-by: Peter-Josef Meisch <pj.meisch@sothawo.com> (cherry picked from commit a07ac3c93d6cf05b4713620877c4d45958b7d514) (cherry picked from commit 0e8401d3b1a97c872e788719dbfbec3c8d3ebaf2)
This commit is contained in:
parent
b10bc3d693
commit
71383536d4
@ -233,6 +233,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
|
||||
.subscribe(new Subscriber<>() {
|
||||
@Nullable private Subscription subscription = null;
|
||||
private final AtomicBoolean upstreamComplete = new AtomicBoolean(false);
|
||||
private final AtomicBoolean onNextHasBeenCalled = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
@ -242,6 +243,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
|
||||
|
||||
@Override
|
||||
public void onNext(List<T> entityList) {
|
||||
onNextHasBeenCalled.set(true);
|
||||
saveAll(entityList, index)
|
||||
.map(sink::tryEmitNext)
|
||||
.doOnComplete(() -> {
|
||||
@ -267,6 +269,10 @@ abstract public class AbstractReactiveElasticsearchTemplate
|
||||
@Override
|
||||
public void onComplete() {
|
||||
upstreamComplete.set(true);
|
||||
if (!onNextHasBeenCalled.get()) {
|
||||
// this happens when an empty flux is saved
|
||||
sink.tryEmitComplete();
|
||||
}
|
||||
}
|
||||
});
|
||||
return sink.asFlux();
|
||||
|
@ -105,6 +105,26 @@ abstract class SimpleReactiveElasticsearchRepositoryIntegrationTests {
|
||||
return operations.exists(id, IndexCoordinates.of(indexNameProvider.indexName()));
|
||||
}
|
||||
|
||||
@Test // #3093
|
||||
@DisplayName("should save all from empty collection")
|
||||
void shouldSaveAllFromEmptyCollection() {
|
||||
|
||||
repository.saveAll(Collections.emptyList())
|
||||
.as(StepVerifier::create)
|
||||
.expectNextCount(0)
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // #3093
|
||||
@DisplayName("should save all from empty flux")
|
||||
void shouldSaveAllFromEmptyFlux() {
|
||||
|
||||
repository.saveAll(Flux.empty())
|
||||
.as(StepVerifier::create)
|
||||
.expectNextCount(0)
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAES-519
|
||||
void saveShouldComputeMultipleEntities() {
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user