Add reactive search batch size option to query.

Original Pull Request #2392
Closes #2061
This commit is contained in:
Peter-Josef Meisch 2022-12-07 13:07:05 +01:00 committed by GitHub
parent 9446d726bc
commit 014aa3dbf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 1 deletions

View File

@ -1313,7 +1313,7 @@ class RequestConverter {
Duration scrollTimeout = query.getScrollTime() != null ? query.getScrollTime() : Duration.ofMinutes(1); Duration scrollTimeout = query.getScrollTime() != null ? query.getScrollTime() : Duration.ofMinutes(1);
builder.scroll(time(scrollTimeout)); builder.scroll(time(scrollTimeout));
// limit the number of documents in a batch // limit the number of documents in a batch
builder.size(500); builder.size(query.getReactiveBatchSize());
} }
if (!isEmpty(query.getIndicesBoost())) { if (!isEmpty(query.getIndicesBoost())) {

View File

@ -47,6 +47,8 @@ import org.springframework.util.Assert;
*/ */
public class BaseQuery implements Query { public class BaseQuery implements Query {
private static final int DEFAULT_REACTIVE_BATCH_SIZE = 500;
@Nullable protected Sort sort; @Nullable protected Sort sort;
protected Pageable pageable = DEFAULT_PAGE; protected Pageable pageable = DEFAULT_PAGE;
protected List<String> fields = new ArrayList<>(); protected List<String> fields = new ArrayList<>();
@ -75,6 +77,7 @@ public class BaseQuery implements Query {
@Nullable protected PointInTime pointInTime; @Nullable protected PointInTime pointInTime;
private boolean queryIsUpdatedByConverter = false; private boolean queryIsUpdatedByConverter = false;
@Nullable private Integer reactiveBatchSize = null;
public BaseQuery() {} public BaseQuery() {}
@ -105,6 +108,7 @@ public class BaseQuery implements Query {
this.requestCache = builder.getRequestCache(); this.requestCache = builder.getRequestCache();
this.idsWithRouting = builder.getIdsWithRouting(); this.idsWithRouting = builder.getIdsWithRouting();
this.pointInTime = builder.getPointInTime(); this.pointInTime = builder.getPointInTime();
this.reactiveBatchSize = builder.getReactiveBatchSize();
} }
@Override @Override
@ -471,6 +475,7 @@ public class BaseQuery implements Query {
/** /**
* used internally. Not considered part of the API. * used internally. Not considered part of the API.
*
* @since 5.0 * @since 5.0
*/ */
public boolean queryIsUpdatedByConverter() { public boolean queryIsUpdatedByConverter() {
@ -479,9 +484,22 @@ public class BaseQuery implements Query {
/** /**
* used internally. Not considered part of the API. * used internally. Not considered part of the API.
*
* @since 5.0 * @since 5.0
*/ */
public void setQueryIsUpdatedByConverter(boolean queryIsUpdatedByConverter) { public void setQueryIsUpdatedByConverter(boolean queryIsUpdatedByConverter) {
this.queryIsUpdatedByConverter = queryIsUpdatedByConverter; this.queryIsUpdatedByConverter = queryIsUpdatedByConverter;
} }
@Override
public Integer getReactiveBatchSize() {
return reactiveBatchSize != null ? reactiveBatchSize : DEFAULT_REACTIVE_BATCH_SIZE;
}
/**
* @since 5.1
*/
public void setReactiveBatchSize(Integer reactiveBatchSize) {
this.reactiveBatchSize = reactiveBatchSize;
}
} }

View File

@ -66,6 +66,8 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
protected final List<RuntimeField> runtimeFields = new ArrayList<>(); protected final List<RuntimeField> runtimeFields = new ArrayList<>();
@Nullable protected Query.PointInTime pointInTime; @Nullable protected Query.PointInTime pointInTime;
@Nullable Integer reactiveBatchSize;
@Nullable @Nullable
public Sort getSort() { public Sort getSort() {
return sort; return sort;
@ -191,6 +193,13 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
return pointInTime; return pointInTime;
} }
/**
* @since 5.1
*/
public Integer getReactiveBatchSize() {
return reactiveBatchSize;
}
public SELF withPageable(Pageable pageable) { public SELF withPageable(Pageable pageable) {
this.pageable = pageable; this.pageable = pageable;
return self(); return self();
@ -375,6 +384,14 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
return self(); return self();
} }
/**
* @since 5.1
*/
public SELF withReactiveBatchSize(@Nullable Integer reactiveBatchSize) {
this.reactiveBatchSize = reactiveBatchSize;
return self();
}
public abstract Q build(); public abstract Q build();
private SELF self() { private SELF self() {

View File

@ -448,6 +448,17 @@ public interface Query {
return null; return null;
}; };
/**
* returns the number of documents that are requested when the reactive code does a batched search operation. This is
* the case when a query has no limit and no Pageable set.
*
* @return the batch size, defaults to 500 in {@link BaseQuery}
* @since 5.1
*/
default Integer getReactiveBatchSize() {
return 500;
}
/** /**
* @since 4.3 * @since 4.3
*/ */