diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java index e19449a8e..e14d14453 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java @@ -21,12 +21,11 @@ import java.util.stream.Collectors; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.common.Nullable; - import org.springframework.data.domain.Page; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.query.AliasQuery; +import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.CriteriaQuery; import org.springframework.data.elasticsearch.core.query.DeleteQuery; import org.springframework.data.elasticsearch.core.query.GetQuery; @@ -36,6 +35,7 @@ import org.springframework.data.elasticsearch.core.query.SearchQuery; import org.springframework.data.elasticsearch.core.query.StringQuery; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.util.CloseableIterator; +import org.springframework.lang.Nullable; /** * ElasticsearchOperations @@ -456,18 +456,40 @@ public interface ElasticsearchOperations { UpdateResponse update(UpdateQuery updateQuery); /** - * Bulk index all objects. Will do save or update + * Bulk index all objects. Will do save or update. * - * @param queries + * @param queries the queries to execute in bulk */ - void bulkIndex(List queries); + default void bulkIndex(List queries) { + bulkIndex(queries, BulkOptions.defaultOptions()); + } + + /** + * Bulk index all objects. Will do save or update. + * + * @param queries the queries to execute in bulk + * @param bulkOptions options to be added to the bulk request + * @since 3.2 + */ + void bulkIndex(List queries, BulkOptions bulkOptions); /** * Bulk update all objects. Will do update * - * @param queries + * @param queries the queries to execute in bulk */ - void bulkUpdate(List queries); + default void bulkUpdate(List queries) { + bulkUpdate(queries, BulkOptions.defaultOptions()); + } + + /** + * Bulk update all objects. Will do update + * + * @param queries the queries to execute in bulk + * @param bulkOptions options to be added to the bulk request + * @since 3.2 + */ + void bulkUpdate(List queries, BulkOptions bulkOptions); /** * Delete the one object with provided id diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 1d4798f52..5e814855b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -61,7 +61,6 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -105,6 +104,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.util.CloseableIterator; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -139,6 +139,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; * @author Roman Puchkovskiy * @author Martin Choraine * @author Farid Azaza + * @author Peter-Josef Meisch */ public class ElasticsearchRestTemplate implements ElasticsearchOperations, EsClient, ApplicationContextAware { @@ -739,8 +740,9 @@ public class ElasticsearchRestTemplate } @Override - public void bulkIndex(List queries) { + public void bulkIndex(List queries, BulkOptions bulkOptions) { BulkRequest bulkRequest = new BulkRequest(); + setBulkOptions(bulkRequest, bulkOptions); for (IndexQuery query : queries) { bulkRequest.add(prepareIndex(query)); } @@ -752,8 +754,9 @@ public class ElasticsearchRestTemplate } @Override - public void bulkUpdate(List queries) { + public void bulkUpdate(List queries, BulkOptions bulkOptions) { BulkRequest bulkRequest = new BulkRequest(); + setBulkOptions(bulkRequest, bulkOptions); for (UpdateQuery query : queries) { bulkRequest.add(prepareUpdate(query)); } @@ -764,6 +767,34 @@ public class ElasticsearchRestTemplate } } + private void setBulkOptions(BulkRequest bulkRequest, BulkOptions bulkOptions) { + + if (bulkOptions.getTimeout() != null) { + + bulkRequest.timeout(bulkOptions.getTimeout()); + } + + if (bulkOptions.getRefreshPolicy() != null) { + + bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy()); + } + + if (bulkOptions.getWaitForActiveShards() != null) { + + bulkRequest.waitForActiveShards(bulkOptions.getWaitForActiveShards()); + } + + if (bulkOptions.getPipeline() != null) { + + bulkRequest.pipeline(bulkOptions.getPipeline()); + } + + if (bulkOptions.getRoutingId() != null) { + + bulkRequest.routing(bulkOptions.getRoutingId()); + } + } + private void checkForBulkUpdateFailure(BulkResponse bulkResponse) { if (bulkResponse.hasFailures()) { Map failedDocuments = new HashMap<>(); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index e471faac5..0ab18c093 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -52,7 +52,6 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -94,6 +93,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.util.CloseableIterator; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -635,8 +635,9 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, EsClient< } @Override - public void bulkIndex(List queries) { + public void bulkIndex(List queries, @Nullable BulkOptions bulkOptions) { BulkRequestBuilder bulkRequest = client.prepareBulk(); + setBulkOptions(bulkRequest, bulkOptions); for (IndexQuery query : queries) { bulkRequest.add(prepareIndex(query)); } @@ -644,14 +645,43 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, EsClient< } @Override - public void bulkUpdate(List queries) { + public void bulkUpdate(List queries, @Nullable BulkOptions bulkOptions) { BulkRequestBuilder bulkRequest = client.prepareBulk(); + setBulkOptions(bulkRequest, bulkOptions); for (UpdateQuery query : queries) { bulkRequest.add(prepareUpdate(query)); } checkForBulkUpdateFailure(bulkRequest.execute().actionGet()); } + private void setBulkOptions(BulkRequestBuilder bulkRequest, BulkOptions bulkOptions) { + + if (bulkOptions.getTimeout() != null) { + + bulkRequest.setTimeout(bulkOptions.getTimeout()); + } + + if (bulkOptions.getRefreshPolicy() != null) { + + bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy()); + } + + if (bulkOptions.getWaitForActiveShards() != null) { + + bulkRequest.setWaitForActiveShards(bulkOptions.getWaitForActiveShards()); + } + + if (bulkOptions.getPipeline() != null) { + + bulkRequest.pipeline(bulkOptions.getPipeline()); + } + + if (bulkOptions.getRoutingId() != null) { + + bulkRequest.routing(bulkOptions.getRoutingId()); + } + } + private void checkForBulkUpdateFailure(BulkResponse bulkResponse) { if (bulkResponse.hasFailures()) { Map failedDocuments = new HashMap<>(); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/BulkOptions.java b/src/main/java/org/springframework/data/elasticsearch/core/query/BulkOptions.java new file mode 100644 index 000000000..b79c542f8 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/BulkOptions.java @@ -0,0 +1,119 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.query; + +import java.util.List; + +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Options that may be passed to an + * {@link org.springframework.data.elasticsearch.core.ElasticsearchOperations#bulkIndex(List, BulkOptions)} or + * {@link org.springframework.data.elasticsearch.core.ElasticsearchOperations#bulkUpdate(List, BulkOptions)} call.
+ * Use {@link BulkOptions#builder()} to obtain a builder, then set the desired properties and call + * {@link BulkOptionsBuilder#build()} to get the BulkOptions object. + * + * @author Peter-Josef Meisch + * @since 3.2 + */ +public class BulkOptions { + + private static final BulkOptions defaultOptions = builder().build(); + + private final TimeValue timeout; + private final WriteRequest.RefreshPolicy refreshPolicy; + private final ActiveShardCount waitForActiveShards; + private final String pipeline; + private final String routingId; + + public static BulkOptionsBuilder builder() { + return new BulkOptionsBuilder(); + } + + public static BulkOptions defaultOptions() { + return defaultOptions; + } + + private BulkOptions(TimeValue timeout, WriteRequest.RefreshPolicy refreshPolicy, ActiveShardCount waitForActiveShards, + String pipeline, String routingId) { + this.timeout = timeout; + this.refreshPolicy = refreshPolicy; + this.waitForActiveShards = waitForActiveShards; + this.pipeline = pipeline; + this.routingId = routingId; + } + + public TimeValue getTimeout() { + return timeout; + } + + public WriteRequest.RefreshPolicy getRefreshPolicy() { + return refreshPolicy; + } + + public ActiveShardCount getWaitForActiveShards() { + return waitForActiveShards; + } + + public String getPipeline() { + return pipeline; + } + + public String getRoutingId() { + return routingId; + } + + public static final class BulkOptionsBuilder { + private TimeValue timeout; + private WriteRequest.RefreshPolicy refreshPolicy; + private ActiveShardCount waitForActiveShards; + private String pipeline; + private String routingId; + + private BulkOptionsBuilder() {} + + public BulkOptionsBuilder withTimeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + public BulkOptionsBuilder withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + return this; + } + + public BulkOptionsBuilder withWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + + public BulkOptionsBuilder withPipeline(String pipeline) { + this.pipeline = pipeline; + return this; + } + + public BulkOptionsBuilder withRoutingId(String routingId) { + this.routingId = routingId; + return this; + } + + public BulkOptions build() { + return new BulkOptions(timeout, refreshPolicy, waitForActiveShards, pipeline, routingId); + } + } +}