DATAES-817 - StreamQueries does only delete the last scrollid. (#449)

Original PR: #449
This commit is contained in:
Peter-Josef Meisch 2020-05-07 22:11:18 +02:00 committed by GitHub
parent 3c9b0a7b2f
commit ec7414c356
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 186 additions and 71 deletions

View File

@ -36,10 +36,7 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -93,7 +90,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
@ -105,6 +101,7 @@ import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -115,7 +112,6 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.reactive.function.BodyExtractors;
@ -835,8 +831,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
}
return Mono.just(content);
})
.doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
}).doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
.flatMap(content -> doDecode(response, responseType, content));
}
@ -893,42 +888,5 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
}
/**
* Mutable state object holding scrollId to be used for {@link SearchScrollRequest#scroll(Scroll)}
*
* @author Christoph Strobl
* @since 3.2
*/
private static class ScrollState {
private final Object lock = new Object();
private final List<String> pastIds = new ArrayList<>(1);
@Nullable private String scrollId;
@Nullable
String getScrollId() {
return scrollId;
}
List<String> getScrollIds() {
synchronized (lock) {
return Collections.unmodifiableList(new ArrayList<>(pastIds));
}
}
void updateScrollId(String scrollId) {
if (StringUtils.hasText(scrollId)) {
synchronized (lock) {
this.scrollId = scrollId;
pastIds.add(scrollId);
}
}
}
}
// endregion
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.search.Scroll;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
/**
* Mutable state object holding scrollId to be used for {@link SearchScrollRequest#scroll(Scroll)}
*
* @author Christoph Strobl
* @author Peter-Josef Meisch
* @since 3.2
*/
public class ScrollState {
private final Object lock = new Object();
private final Set<String> pastIds = new LinkedHashSet<>();
@Nullable private String scrollId;
public ScrollState() {}
public ScrollState(String scrollId) {
updateScrollId(scrollId);
}
@Nullable
public String getScrollId() {
return scrollId;
}
public List<String> getScrollIds() {
synchronized (lock) {
return Collections.unmodifiableList(new ArrayList<>(pastIds));
}
}
public void updateScrollId(String scrollId) {
if (StringUtils.hasText(scrollId)) {
synchronized (lock) {
this.scrollId = scrollId;
pastIds.add(scrollId);
}
}
}
}

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -341,7 +342,14 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
/*
* internal use only, not for public API
*/
abstract protected void searchScrollClear(String scrollId);
protected void searchScrollClear(String scrollId) {
searchScrollClear(Collections.singletonList(scrollId));
}
/*
* internal use only, not for public API
*/
abstract protected void searchScrollClear(List<String> scrollIds);
abstract protected MultiSearchResponse.Item[] getMultiSearchResult(MultiSearchRequest request);
// endregion

View File

@ -299,9 +299,9 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
}
@Override
public void searchScrollClear(String scrollId) {
public void searchScrollClear(List<String> scrollIds) {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
request.scrollIds(scrollIds);
execute(client -> client.clearScroll(request, RequestOptions.DEFAULT));
}

View File

@ -321,8 +321,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
}
@Override
public void searchScrollClear(String scrollId) {
client.prepareClearScroll().addScrollId(scrollId).execute().actionGet();
public void searchScrollClear(List<String> scrollIds) {
client.prepareClearScroll().setScrollIds(scrollIds).execute().actionGet();
}
@Override

View File

@ -16,11 +16,13 @@
package org.springframework.data.elasticsearch.core;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -38,12 +40,12 @@ abstract class StreamQueries {
*
* @param searchHits the initial hits
* @param continueScrollFunction function to continue scrolling applies to the current scrollId.
* @param clearScrollConsumer consumer to clear the scroll context by accepting the current scrollId.
* @param clearScrollConsumer consumer to clear the scroll context by accepting the scrollIds to clear.
* @param <T>
* @return the {@link SearchHitsIterator}.
*/
static <T> SearchHitsIterator<T> streamResults(SearchScrollHits<T> searchHits,
Function<String, SearchScrollHits<T>> continueScrollFunction, Consumer<String> clearScrollConsumer) {
Function<String, SearchScrollHits<T>> continueScrollFunction, Consumer<List<String>> clearScrollConsumer) {
Assert.notNull(searchHits, "searchHits must not be null.");
Assert.notNull(searchHits.getScrollId(), "scrollId of searchHits must not be null.");
@ -59,17 +61,17 @@ abstract class StreamQueries {
// As we couldn't retrieve single result with scroll, store current hits.
private volatile Iterator<SearchHit<T>> scrollHits = searchHits.iterator();
private volatile String scrollId = searchHits.getScrollId();
private volatile boolean continueScroll = scrollHits.hasNext();
private volatile ScrollState scrollState = new ScrollState(searchHits.getScrollId());
@Override
public void close() {
try {
clearScrollConsumer.accept(scrollId);
clearScrollConsumer.accept(scrollState.getScrollIds());
} finally {
scrollHits = null;
scrollId = null;
scrollState = null;
}
}
@ -102,9 +104,9 @@ abstract class StreamQueries {
}
if (!scrollHits.hasNext()) {
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollId);
SearchScrollHits<T> nextPage = continueScrollFunction.apply(scrollState.getScrollId());
scrollHits = nextPage.iterator();
scrollId = nextPage.getScrollId();
scrollState.updateScrollId(nextPage.getScrollId());
continueScroll = scrollHits.hasNext();
}
@ -127,6 +129,5 @@ abstract class StreamQueries {
}
// utility constructor
private StreamQueries() {
}
private StreamQueries() {}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.util;
import static org.assertj.core.api.Assertions.*;
import java.util.Arrays;
import org.junit.jupiter.api.Test;
/**
* @author Peter-Josef Meisch
*/
class ScrollStateTest {
@Test // DATAES-817
void shouldReturnLastSetScrollId() {
ScrollState scrollState = new ScrollState();
scrollState.updateScrollId("id-1");
scrollState.updateScrollId("id-2");
assertThat(scrollState.getScrollId()).isEqualTo("id-2");
}
@Test
void shouldReturnUniqueListOfUsedScrollIdsInCorrectOrder() {
ScrollState scrollState = new ScrollState();
scrollState.updateScrollId("id-1");
scrollState.updateScrollId("id-2");
scrollState.updateScrollId("id-1");
scrollState.updateScrollId("id-3");
scrollState.updateScrollId("id-2");
assertThat(scrollState.getScrollIds()).isEqualTo(Arrays.asList("id-1", "id-2", "id-3"));
}
}

View File

@ -18,19 +18,17 @@ package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.search.aggregations.Aggregations;
import org.junit.jupiter.api.Test;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.lang.Nullable;
/**
* @author Sascha Woo
* @author Peter-Josef Meisch
*/
public class StreamQueriesTest {
@ -41,15 +39,15 @@ public class StreamQueriesTest {
List<SearchHit<String>> hits = new ArrayList<>();
hits.add(new SearchHit<String>(null, 0, null, null, "one"));
SearchScrollHits<String> searchHits = newSearchScrollHits(hits);
SearchScrollHits<String> searchHits = newSearchScrollHits(hits, "1234");
AtomicBoolean clearScrollCalled = new AtomicBoolean(false);
// when
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
searchHits, //
scrollId -> newSearchScrollHits(Collections.emptyList()), //
scrollId -> clearScrollCalled.set(true));
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
scrollIds -> clearScrollCalled.set(true));
while (iterator.hasNext()) {
iterator.next();
@ -68,21 +66,47 @@ public class StreamQueriesTest {
List<SearchHit<String>> hits = new ArrayList<>();
hits.add(new SearchHit<String>(null, 0, null, null, "one"));
SearchScrollHits<String> searchHits = newSearchScrollHits(hits);
SearchScrollHits<String> searchHits = newSearchScrollHits(hits, "1234");
// when
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
searchHits, //
scrollId -> newSearchScrollHits(Collections.emptyList()), //
scrollId -> {
});
scrollId -> newSearchScrollHits(Collections.emptyList(), scrollId), //
scrollId -> {});
// then
assertThat(iterator.getTotalHits()).isEqualTo(1);
}
private SearchScrollHits<String> newSearchScrollHits(List<SearchHit<String>> hits) {
return new SearchHitsImpl<String>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, "1234", hits, null);
@Test // DATAES-817
void shouldClearAllScrollIds() {
SearchScrollHits<String> searchHits1 = newSearchScrollHits(
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-1");
SearchScrollHits<String> searchHits2 = newSearchScrollHits(
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-2");
SearchScrollHits<String> searchHits3 = newSearchScrollHits(
Collections.singletonList(new SearchHit<String>(null, 0, null, null, "one")), "s-2");
SearchScrollHits<String> searchHits4 = newSearchScrollHits(Collections.emptyList(), "s-3");
Iterator<SearchScrollHits<String>> searchScrollHitsIterator = Arrays.asList(searchHits1, searchHits2, searchHits3,searchHits4).iterator();
List<String> clearedScrollIds = new ArrayList<>();
SearchHitsIterator<String> iterator = StreamQueries.streamResults( //
searchScrollHitsIterator.next(), //
scrollId -> searchScrollHitsIterator.next(), //
scrollIds -> clearedScrollIds.addAll(scrollIds));
while (iterator.hasNext()) {
iterator.next();
}
iterator.close();
assertThat(clearedScrollIds).isEqualTo(Arrays.asList("s-1", "s-2", "s-3"));
}
private SearchScrollHits<String> newSearchScrollHits(List<SearchHit<String>> hits, String scrollId) {
return new SearchHitsImpl<String>(hits.size(), TotalHitsRelation.EQUAL_TO, 0, scrollId, hits, null);
}
}