diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java index a1d987b97..5c7678e4b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java @@ -1313,7 +1313,7 @@ class RequestConverter { Duration scrollTimeout = query.getScrollTime() != null ? query.getScrollTime() : Duration.ofMinutes(1); builder.scroll(time(scrollTimeout)); // limit the number of documents in a batch - builder.size(500); + builder.size(query.getReactiveBatchSize()); } if (!isEmpty(query.getIndicesBoost())) { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java index fb362704a..e924438f4 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java @@ -47,6 +47,8 @@ import org.springframework.util.Assert; */ public class BaseQuery implements Query { + private static final int DEFAULT_REACTIVE_BATCH_SIZE = 500; + @Nullable protected Sort sort; protected Pageable pageable = DEFAULT_PAGE; protected List fields = new ArrayList<>(); @@ -75,6 +77,7 @@ public class BaseQuery implements Query { @Nullable protected PointInTime pointInTime; private boolean queryIsUpdatedByConverter = false; + @Nullable private Integer reactiveBatchSize = null; public BaseQuery() {} @@ -105,6 +108,7 @@ public class BaseQuery implements Query { this.requestCache = builder.getRequestCache(); this.idsWithRouting = builder.getIdsWithRouting(); this.pointInTime = builder.getPointInTime(); + this.reactiveBatchSize = builder.getReactiveBatchSize(); } @Override @@ -471,6 +475,7 @@ public class BaseQuery implements Query { /** * used internally. Not considered part of the API. + * * @since 5.0 */ public boolean queryIsUpdatedByConverter() { @@ -479,9 +484,22 @@ public class BaseQuery implements Query { /** * used internally. Not considered part of the API. + * * @since 5.0 */ public void setQueryIsUpdatedByConverter(boolean queryIsUpdatedByConverter) { this.queryIsUpdatedByConverter = queryIsUpdatedByConverter; } + + @Override + public Integer getReactiveBatchSize() { + return reactiveBatchSize != null ? reactiveBatchSize : DEFAULT_REACTIVE_BATCH_SIZE; + } + + /** + * @since 5.1 + */ + public void setReactiveBatchSize(Integer reactiveBatchSize) { + this.reactiveBatchSize = reactiveBatchSize; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java index 70fac56e0..6c9901d12 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQueryBuilder.java @@ -66,6 +66,8 @@ public abstract class BaseQueryBuilder runtimeFields = new ArrayList<>(); @Nullable protected Query.PointInTime pointInTime; + @Nullable Integer reactiveBatchSize; + @Nullable public Sort getSort() { return sort; @@ -191,6 +193,13 @@ public abstract class BaseQueryBuilder