Automatically close scroll context when returning streamed results.

Original Pull Request #1746
Closes #1745
This commit is contained in:
Peter-Josef Meisch 2021-03-27 15:44:46 +01:00 committed by GitHub
parent 3500dad2bc
commit 13ab2b9e95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 12 deletions

View File

@ -66,10 +66,14 @@ abstract class StreamQueries {
private volatile Iterator<SearchHit<T>> currentScrollHits = searchHits.iterator();
private volatile boolean continueScroll = currentScrollHits.hasNext();
private volatile ScrollState scrollState = new ScrollState(searchHits.getScrollId());
private volatile boolean isClosed = false;
@Override
public void close() {
clearScrollConsumer.accept(scrollState.getScrollIds());
if (!isClosed) {
clearScrollConsumer.accept(scrollState.getScrollIds());
isClosed = true;
}
}
@Override
@ -96,18 +100,24 @@ abstract class StreamQueries {
@Override
public boolean hasNext() {
if (!continueScroll || (maxCount > 0 && currentCount.get() >= maxCount)) {
return false;
boolean hasNext = false;
if (!isClosed && continueScroll && (maxCount <= 0 || currentCount.get() < maxCount)) {
if (!currentScrollHits.hasNext()) {
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollState.getScrollId());
currentScrollHits = nextPage.iterator();
scrollState.updateScrollId(nextPage.getScrollId());
continueScroll = currentScrollHits.hasNext();
}
hasNext = currentScrollHits.hasNext();
}
if (!currentScrollHits.hasNext()) {
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollState.getScrollId());
currentScrollHits = nextPage.iterator();
scrollState.updateScrollId(nextPage.getScrollId());
continueScroll = currentScrollHits.hasNext();
if (!hasNext) {
close();
}
return currentScrollHits.hasNext();
return hasNext;
}
@Override

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.data.util.StreamUtils;
@ -39,6 +40,8 @@ public class StreamQueriesTest {
// given
List<SearchHit<String>> hits = new ArrayList<>();
hits.add(getOneSearchHit());
hits.add(getOneSearchHit());
hits.add(getOneSearchHit());
SearchScrollHits<String> searchHits = newSearchScrollHits(hits, "1234");
@ -51,9 +54,7 @@ public class StreamQueriesTest {
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
scrollIds -> clearScrollCalled.set(true));
while (iterator.hasNext()) {
iterator.next();
}
iterator.next();
iterator.close();
// then
@ -61,6 +62,27 @@ public class StreamQueriesTest {
}
@Test // #1745
@DisplayName("should call clearScroll when no more data is available")
void shouldCallClearScrollWhenNoMoreDataIsAvailable() {
List<SearchHit<String>> hits = new ArrayList<>();
hits.add(getOneSearchHit());
SearchScrollHits<String> searchHits = newSearchScrollHits(hits, "1234");
AtomicBoolean clearScrollCalled = new AtomicBoolean(false);
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
0, //
searchHits, //
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
scrollIds -> clearScrollCalled.set(true));
while (iterator.hasNext()) {
iterator.next();
}
assertThat(clearScrollCalled).isTrue();
}
private SearchHit<String> getOneSearchHit() {
return new SearchHit<String>(null, null, null, 0, null, null, null, null, null, null, "one");
}