DATAES-764 - StreamQueries#streamResults does not clear scroll context when finished.

Original PR: #406
This commit is contained in:
Sascha Woo 2020-03-18 20:37:05 +01:00 committed by GitHub
parent 300eb313dd
commit f103bdb9d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 110 additions and 32 deletions

View File

@ -7,9 +7,9 @@ import org.springframework.lang.Nullable;
/**
* @author Artur Konczak
* @author Peter-Josef Meisch
* @author Sascha Woo
*/
public interface ScrolledPage<T> extends Page<T> {
@Nullable
String getScrollId();
String getScrollId();
}

View File

@ -21,11 +21,13 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.springframework.data.util.CloseableIterator;
import org.springframework.util.Assert;
/**
* Utility to support streaming queries.
*
* @author Mark Paluch
* @author Sascha Woo
* @since 3.2
*/
abstract class StreamQueries {
@ -33,72 +35,71 @@ abstract class StreamQueries {
/**
* Stream query results using {@link ScrolledPage}.
*
* @param page the initial page.
* @param continueFunction continuation function accepting the current scrollId.
* @param clearScroll cleanup function accepting the current scrollId.
* @param page the initial scrolled page.
* @param continueScrollFunction function to continue scrolling applies to the current scrollId.
* @param clearScrollConsumer consumer to clear the scroll context by accepting the current scrollId.
* @param <T>
* @return the {@link CloseableIterator}.
*/
static <T> CloseableIterator<T> streamResults(ScrolledPage<T> page,
Function<String, ScrolledPage<T>> continueFunction, Consumer<String> clearScroll) {
Function<String, ScrolledPage<T>> continueScrollFunction, Consumer<String> clearScrollConsumer) {
Assert.notNull(page, "page must not be null.");
Assert.notNull(page.getScrollId(), "scrollId must not be null.");
Assert.notNull(continueScrollFunction, "continueScrollFunction must not be null.");
Assert.notNull(clearScrollConsumer, "clearScrollConsumer must not be null.");
return new CloseableIterator<T>() {
/** As we couldn't retrieve single result with scroll, store current hits. */
private volatile Iterator<T> currentHits = page.iterator();
/** The scroll id. */
// As we couldn't retrieve single result with scroll, store current hits.
private volatile Iterator<T> scrollHits = page.iterator();
private volatile String scrollId = page.getScrollId();
/** If stream is finished (ie: cluster returns no results. */
private volatile boolean finished = !currentHits.hasNext();
private volatile boolean continueScroll = scrollHits.hasNext();
@Override
public void close() {
try {
// Clear scroll on cluster only in case of error (cause elasticsearch auto clear scroll when it's done)
if (!finished && scrollId != null && currentHits != null && currentHits.hasNext()) {
clearScroll.accept(scrollId);
}
clearScrollConsumer.accept(scrollId);
} finally {
currentHits = null;
scrollHits = null;
scrollId = null;
}
}
@Override
public boolean hasNext() {
// Test if stream is finished
if (finished) {
if (!continueScroll) {
return false;
}
// Test if it remains hits
if (currentHits == null || !currentHits.hasNext()) {
// Do a new request
ScrolledPage<T> scroll = continueFunction.apply(scrollId);
// Save hits and scroll id
currentHits = scroll.iterator();
finished = !currentHits.hasNext();
scrollId = scroll.getScrollId();
if (!scrollHits.hasNext()) {
ScrolledPage<T> nextPage = continueScrollFunction.apply(scrollId);
scrollHits = nextPage.iterator();
scrollId = nextPage.getScrollId();
continueScroll = scrollHits.hasNext();
}
return currentHits.hasNext();
return scrollHits.hasNext();
}
@Override
public T next() {
if (hasNext()) {
return currentHits.next();
return scrollHits.next();
}
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove");
throw new UnsupportedOperationException();
}
};
}
// utility constructor
private StreamQueries() {}
private StreamQueries() {
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;
/**
* @author Sascha Woo
*/
public class StreamQueriesTest {
@Test // DATAES-764
public void shouldCallClearScrollOnIteratorClose() {
// given
List<String> results = new ArrayList<>();
results.add("one");
ScrolledPage<String> page = new ScrolledPageImpl("1234", results);
AtomicBoolean clearScrollCalled = new AtomicBoolean(false);
// when
CloseableIterator<String> closeableIterator = StreamQueries.streamResults( //
page, //
scrollId -> new ScrolledPageImpl(scrollId, Collections.emptyList()), //
scrollId -> clearScrollCalled.set(true));
while (closeableIterator.hasNext()) {
closeableIterator.next();
}
closeableIterator.close();
// then
assertThat(clearScrollCalled).isTrue();
}
private static class ScrolledPageImpl extends PageImpl<String> implements ScrolledPage<String> {
private String scrollId;
public ScrolledPageImpl(String scrollId, List<String> content) {
super(content);
this.scrollId = scrollId;
}
@Override
@Nullable
public String getScrollId() {
return scrollId;
}
}
}