From 12ddb74faebb122132b0b4079ede4daa735d9670 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Tue, 15 Jul 2025 18:28:51 +0200 Subject: [PATCH] Fix the calculation of the requested number of documents. Original Pull Request #3128 Closes #3127 Signed-off-by: Peter-Josef Meisch --- .../client/elc/RequestConverter.java | 42 +++---- .../elasticsearch/core/query/BaseQuery.java | 62 +++++++++- .../data/elasticsearch/core/query/Query.java | 7 ++ .../core/ElasticsearchIntegrationTests.java | 2 - .../core/query/BaseQueryTests.java | 106 ++++++++++++++++++ 5 files changed, 184 insertions(+), 35 deletions(-) create mode 100644 src/test/java/org/springframework/data/elasticsearch/core/query/BaseQueryTests.java diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java index e1a6c0927..5dc3039c0 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java @@ -120,9 +120,6 @@ class RequestConverter extends AbstractQueryProcessor { private static final Log LOGGER = LogFactory.getLog(RequestConverter.class); - // the default max result window size of Elasticsearch - public static final Integer INDEX_MAX_RESULT_WINDOW = 10_000; - protected final JsonpMapper jsonpMapper; protected final ElasticsearchConverter elasticsearchConverter; @@ -751,9 +748,9 @@ class RequestConverter extends AbstractQueryProcessor { } return co.elastic.clients.elasticsearch._types.Script.of(sb -> { sb.lang(scriptData.language()) - .params(params) - .id(scriptData.scriptName()); - if (scriptData.script() != null){ + .params(params) + .id(scriptData.scriptName()); + if (scriptData.script() != null) { sb.source(s -> s.scriptString(scriptData.script())); } return sb; @@ -927,7 +924,7 @@ class RequestConverter extends AbstractQueryProcessor { ReindexRequest.Script script = reindexRequest.getScript(); if (script != null) { builder.script(sb -> { - if (script.getSource() != null){ + if (script.getSource() != null) { sb.source(s -> s.scriptString(script.getSource())); } sb.lang(script.getLang()); @@ -1089,7 +1086,7 @@ class RequestConverter extends AbstractQueryProcessor { uqb.script(sb -> { sb.lang(query.getLang()).params(params); - if (query.getScript() != null){ + if (query.getScript() != null) { sb.source(s -> s.scriptString(query.getScript())); } sb.id(query.getId()); @@ -1257,8 +1254,8 @@ class RequestConverter extends AbstractQueryProcessor { .header(msearchHeaderBuilder(query, param.index(), routing)) .body(bb -> { bb.explain(query.getExplain()) // - .id(query.getId()); // - if (query.getSource() != null){ + .id(query.getId()); // + if (query.getSource() != null) { bb.source(s -> s.scriptString(query.getSource())); } @@ -1296,15 +1293,8 @@ class RequestConverter extends AbstractQueryProcessor { .timeout(timeStringMs(query.getTimeout())) // ; - var offset = query.getPageable().isPaged() ? query.getPageable().getOffset() : 0; - var pageSize = query.getPageable().isPaged() ? query.getPageable().getPageSize() - : INDEX_MAX_RESULT_WINDOW; - // if we have both a page size and a max results, we take the min, this is necessary for - // searchForStream to work correctly (#3098) as there the page size defines what is - // returned in a single request, and the max result determines the total number of - // documents returned - var size = query.isLimiting() ? Math.min(pageSize, query.getMaxResults()) : pageSize; - bb.from((int) offset).size(size); + bb.from((int) (query.getPageable().isPaged() ? query.getPageable().getOffset() : 0)) + .size(query.getRequestSize()); if (!isEmpty(query.getFields())) { bb.fields(fb -> { @@ -1476,14 +1466,8 @@ class RequestConverter extends AbstractQueryProcessor { builder.seqNoPrimaryTerm(true); } - var offset = query.getPageable().isPaged() ? query.getPageable().getOffset() : 0; - var pageSize = query.getPageable().isPaged() ? query.getPageable().getPageSize() : INDEX_MAX_RESULT_WINDOW; - // if we have both a page size and a max results, we take the min, this is necessary for - // searchForStream to work correctly (#3098) as there the page size defines what is - // returned in a single request, and the max result determines the total number of - // documents returned - var size = query.isLimiting() ? Math.min(pageSize, query.getMaxResults()) : pageSize; - builder.from((int) offset).size(size); + builder.from((int) (query.getPageable().isPaged() ? query.getPageable().getOffset() : 0)) + .size(query.getRequestSize()); if (!isEmpty(query.getFields())) { var fieldAndFormats = query.getFields().stream().map(field -> FieldAndFormat.of(b -> b.field(field))).toList(); @@ -1943,8 +1927,8 @@ class RequestConverter extends AbstractQueryProcessor { return PutScriptRequest.of(b -> b // .id(script.id()) // .script(sb -> sb // - .lang(script.language()) // - .source(s -> s.scriptString(script.source())))); + .lang(script.language()) // + .source(s -> s.scriptString(script.source())))); } public GetScriptRequest scriptGet(String name) { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java index 1cb0190b0..62be46b83 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.stream.Collectors; import org.jspecify.annotations.Nullable; +import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.util.Assert; @@ -47,10 +48,15 @@ import org.springframework.util.Assert; */ public class BaseQuery implements Query { + public static final int INDEX_MAX_RESULT_WINDOW = 10_000; + private static final int DEFAULT_REACTIVE_BATCH_SIZE = 500; + // the instance to mark the query pageable initial status, needed to distinguish between the initial + // value and a user-set unpaged value; values don't matter, the RequestConverter compares to the isntance. + private static final Pageable UNSET_PAGE = PageRequest.of(0, 1); @Nullable protected Sort sort; - protected Pageable pageable = DEFAULT_PAGE; + protected Pageable pageable = UNSET_PAGE; protected List fields = new ArrayList<>(); @Nullable protected List storedFields; @Nullable protected SourceFilter sourceFilter; @@ -78,7 +84,7 @@ public class BaseQuery implements Query { private boolean queryIsUpdatedByConverter = false; @Nullable private Integer reactiveBatchSize = null; @Nullable private Boolean allowNoIndices = null; - private EnumSet expandWildcards; + private EnumSet expandWildcards = EnumSet.noneOf(IndicesOptions.WildcardStates.class); private List docValueFields = new ArrayList<>(); private List scriptedFields = new ArrayList<>(); @@ -87,7 +93,7 @@ public class BaseQuery implements Query { public > BaseQuery(BaseQueryBuilder builder) { this.sort = builder.getSort(); // do a setPageable after setting the sort, because the pageable may contain an additional sort - this.setPageable(builder.getPageable() != null ? builder.getPageable() : DEFAULT_PAGE); + this.setPageable(builder.getPageable() != null ? builder.getPageable() : UNSET_PAGE); this.fields = builder.getFields(); this.storedFields = builder.getStoredFields(); this.sourceFilter = builder.getSourceFilter(); @@ -203,7 +209,7 @@ public class BaseQuery implements Query { @Override @SuppressWarnings("unchecked") public final T addSort(@Nullable Sort sort) { - if (sort == null) { + if (sort == null || sort.isUnsorted()) { return (T) this; } @@ -561,4 +567,52 @@ public class BaseQuery implements Query { public List getScriptedFields() { return scriptedFields; } + + @Override + public Integer getRequestSize() { + + var pageable = getPageable(); + Integer requestSize = null; + + if (pageable.isPaged() && pageable != UNSET_PAGE) { + // pagesize defined by the user + if (!isLimiting()) { + // no maxResults + requestSize = pageable.getPageSize(); + } else { + // if we have both a page size and a max results, we take the min, this is necessary for + // searchForStream to work correctly (#3098) as there the page size defines what is + // returned in a single request, and the max result determines the total number of + // documents returned. + requestSize = Math.min(pageable.getPageSize(), getMaxResults()); + } + } else if (pageable == UNSET_PAGE) { + // no user defined pageable + if (isLimiting()) { + // maxResults + requestSize = getMaxResults(); + } else { + requestSize = DEFAULT_PAGE_SIZE; + } + } else { + // explicitly set unpaged + if (!isLimiting()) { + // no maxResults + requestSize = INDEX_MAX_RESULT_WINDOW; + } else { + // if we have both a implicit page size and a max results, we take the min, this is necessary for + // searchForStream to work correctly (#3098) as there the page size defines what is + // returned in a single request, and the max result determines the total number of + // documents returned. + requestSize = Math.min(INDEX_MAX_RESULT_WINDOW, getMaxResults()); + } + } + + if (requestSize == null) { + // this should not happen + requestSize = DEFAULT_PAGE_SIZE; + } + + return requestSize; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java b/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java index ce3d445e1..876beff92 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/Query.java @@ -484,6 +484,13 @@ public interface Query { */ List getScriptedFields(); + /** + * @return the number of documents that should be requested from Elasticsearch in this query. Depends wether a + * Pageable and/or maxResult size is set on the query. + * @since 5.4.8 5.5.2 + */ + public Integer getRequestSize(); + /** * @since 4.3 */ diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java index 195b11b3d..0a716ea16 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java @@ -104,8 +104,6 @@ import org.springframework.data.util.StreamUtils; @SpringIntegrationTest public abstract class ElasticsearchIntegrationTests { - static final Integer INDEX_MAX_RESULT_WINDOW = 10_000; - private static final String MULTI_INDEX_PREFIX = "test-index"; private static final String MULTI_INDEX_ALL = MULTI_INDEX_PREFIX + "*"; private static final String MULTI_INDEX_1_NAME = MULTI_INDEX_PREFIX + "-1"; diff --git a/src/test/java/org/springframework/data/elasticsearch/core/query/BaseQueryTests.java b/src/test/java/org/springframework/data/elasticsearch/core/query/BaseQueryTests.java new file mode 100644 index 000000000..946d710f4 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/query/BaseQueryTests.java @@ -0,0 +1,106 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.query; + +import static org.assertj.core.api.Assertions.*; +import static org.springframework.data.elasticsearch.core.query.BaseQuery.*; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.data.domain.Pageable; + +class BaseQueryTests { + + private static final String MATCH_ALL_QUERY = "{\"match_all\":{}}"; + + @Test // #3127 + @DisplayName("query with no Pageable and no maxResults requests 10 docs from 0") + void queryWithNoPageableAndNoMaxResultsRequests10DocsFrom0() { + + var query = StringQuery.builder(MATCH_ALL_QUERY) + .build(); + + var requestSize = query.getRequestSize(); + + assertThat(requestSize).isEqualTo(10); + } + + @Test // #3127 + @DisplayName("query with a Pageable and no MaxResults request with values from Pageable") + void queryWithAPageableAndNoMaxResultsRequestWithValuesFromPageable() { + var query = StringQuery.builder(MATCH_ALL_QUERY) + .withPageable(Pageable.ofSize(42)) + .build(); + + var requestSize = query.getRequestSize(); + + assertThat(requestSize).isEqualTo(42); + } + + @Test // #3127 + @DisplayName("query with no Pageable and maxResults requests maxResults") + void queryWithNoPageableAndMaxResultsRequestsMaxResults() { + + var query = StringQuery.builder(MATCH_ALL_QUERY) + .withMaxResults(12_345) + .build(); + + var requestSize = query.getRequestSize(); + + assertThat(requestSize).isEqualTo(12_345); + } + + @Test // #3127 + @DisplayName("query with Pageable and maxResults requests with values from Pageable if Pageable is less than maxResults") + void queryWithPageableAndMaxResultsRequestsWithValuesFromPageableIfPageableIsLessThanMaxResults() { + + var query = StringQuery.builder(MATCH_ALL_QUERY) + .withPageable(Pageable.ofSize(42)) + .withMaxResults(123) + .build(); + + var requestSize = query.getRequestSize(); + + assertThat(requestSize).isEqualTo(42); + } + + @Test // #3127 + @DisplayName("query with Pageable and maxResults requests with values from maxResults if Pageable is more than maxResults") + void queryWithPageableAndMaxResultsRequestsWithValuesFromMaxResultsIfPageableIsMoreThanMaxResults() { + + var query = StringQuery.builder(MATCH_ALL_QUERY) + .withPageable(Pageable.ofSize(420)) + .withMaxResults(123) + .build(); + + var requestSize = query.getRequestSize(); + + assertThat(requestSize).isEqualTo(123); + } + + @Test // #3127 + @DisplayName("query with explicit unpaged request and no maxResults requests max request window size") + void queryWithExplicitUnpagedRequestAndNoMaxResultsRequestsMaxRequestWindowSize() { + + var query = StringQuery.builder(MATCH_ALL_QUERY) + .withPageable(Pageable.unpaged()) + .build(); + + var requestSize = query.getRequestSize(); + + assertThat(requestSize).isEqualTo(INDEX_MAX_RESULT_WINDOW); + } +}