DATAES-603 - Add support for bulk options in Elasticsearch-Template classes.

Original pull request: #296.
This commit is contained in:
P.J.Meisch 2019-07-16 21:38:17 +02:00 committed by Mark Paluch
parent 4c743ee5b9
commit 8c7d5d47b1
4 changed files with 215 additions and 13 deletions

View File

@ -21,12 +21,11 @@ import java.util.stream.Collectors;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.Nullable;
import org.springframework.data.domain.Page;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.query.AliasQuery;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
import org.springframework.data.elasticsearch.core.query.GetQuery;
@ -36,6 +35,7 @@ import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;
/**
* ElasticsearchOperations
@ -456,18 +456,40 @@ public interface ElasticsearchOperations {
UpdateResponse update(UpdateQuery updateQuery);
/**
* Bulk index all objects. Will do save or update
* Bulk index all objects. Will do save or update.
*
* @param queries
* @param queries the queries to execute in bulk
*/
void bulkIndex(List<IndexQuery> queries);
default void bulkIndex(List<IndexQuery> queries) {
bulkIndex(queries, BulkOptions.defaultOptions());
}
/**
* Bulk index all objects. Will do save or update.
*
* @param queries the queries to execute in bulk
* @param bulkOptions options to be added to the bulk request
* @since 3.2
*/
void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions);
/**
* Bulk update all objects. Will do update
*
* @param queries
* @param queries the queries to execute in bulk
*/
void bulkUpdate(List<UpdateQuery> queries);
default void bulkUpdate(List<UpdateQuery> queries) {
bulkUpdate(queries, BulkOptions.defaultOptions());
}
/**
* Bulk update all objects. Will do update
*
* @param queries the queries to execute in bulk
* @param bulkOptions options to be added to the bulk request
* @since 3.2
*/
void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions);
/**
* Delete the one object with provided id

View File

@ -61,7 +61,6 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -105,6 +104,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@ -139,6 +139,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
* @author Roman Puchkovskiy
* @author Martin Choraine
* @author Farid Azaza
* @author Peter-Josef Meisch
*/
public class ElasticsearchRestTemplate
implements ElasticsearchOperations, EsClient<RestHighLevelClient>, ApplicationContextAware {
@ -739,8 +740,9 @@ public class ElasticsearchRestTemplate
}
@Override
public void bulkIndex(List<IndexQuery> queries) {
public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions) {
BulkRequest bulkRequest = new BulkRequest();
setBulkOptions(bulkRequest, bulkOptions);
for (IndexQuery query : queries) {
bulkRequest.add(prepareIndex(query));
}
@ -752,8 +754,9 @@ public class ElasticsearchRestTemplate
}
@Override
public void bulkUpdate(List<UpdateQuery> queries) {
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions) {
BulkRequest bulkRequest = new BulkRequest();
setBulkOptions(bulkRequest, bulkOptions);
for (UpdateQuery query : queries) {
bulkRequest.add(prepareUpdate(query));
}
@ -764,6 +767,34 @@ public class ElasticsearchRestTemplate
}
}
private void setBulkOptions(BulkRequest bulkRequest, BulkOptions bulkOptions) {
if (bulkOptions.getTimeout() != null) {
bulkRequest.timeout(bulkOptions.getTimeout());
}
if (bulkOptions.getRefreshPolicy() != null) {
bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy());
}
if (bulkOptions.getWaitForActiveShards() != null) {
bulkRequest.waitForActiveShards(bulkOptions.getWaitForActiveShards());
}
if (bulkOptions.getPipeline() != null) {
bulkRequest.pipeline(bulkOptions.getPipeline());
}
if (bulkOptions.getRoutingId() != null) {
bulkRequest.routing(bulkOptions.getRoutingId());
}
}
private void checkForBulkUpdateFailure(BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Map<String, String> failedDocuments = new HashMap<>();

View File

@ -52,7 +52,6 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -94,6 +93,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@ -635,8 +635,9 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, EsClient<
}
@Override
public void bulkIndex(List<IndexQuery> queries) {
public void bulkIndex(List<IndexQuery> queries, @Nullable BulkOptions bulkOptions) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
setBulkOptions(bulkRequest, bulkOptions);
for (IndexQuery query : queries) {
bulkRequest.add(prepareIndex(query));
}
@ -644,14 +645,43 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, EsClient<
}
@Override
public void bulkUpdate(List<UpdateQuery> queries) {
public void bulkUpdate(List<UpdateQuery> queries, @Nullable BulkOptions bulkOptions) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
setBulkOptions(bulkRequest, bulkOptions);
for (UpdateQuery query : queries) {
bulkRequest.add(prepareUpdate(query));
}
checkForBulkUpdateFailure(bulkRequest.execute().actionGet());
}
private void setBulkOptions(BulkRequestBuilder bulkRequest, BulkOptions bulkOptions) {
if (bulkOptions.getTimeout() != null) {
bulkRequest.setTimeout(bulkOptions.getTimeout());
}
if (bulkOptions.getRefreshPolicy() != null) {
bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy());
}
if (bulkOptions.getWaitForActiveShards() != null) {
bulkRequest.setWaitForActiveShards(bulkOptions.getWaitForActiveShards());
}
if (bulkOptions.getPipeline() != null) {
bulkRequest.pipeline(bulkOptions.getPipeline());
}
if (bulkOptions.getRoutingId() != null) {
bulkRequest.routing(bulkOptions.getRoutingId());
}
}
private void checkForBulkUpdateFailure(BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Map<String, String> failedDocuments = new HashMap<>();

View File

@ -0,0 +1,119 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import java.util.List;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
/**
* Options that may be passed to an
* {@link org.springframework.data.elasticsearch.core.ElasticsearchOperations#bulkIndex(List, BulkOptions)} or
* {@link org.springframework.data.elasticsearch.core.ElasticsearchOperations#bulkUpdate(List, BulkOptions)} call. <br/>
* Use {@link BulkOptions#builder()} to obtain a builder, then set the desired properties and call
* {@link BulkOptionsBuilder#build()} to get the BulkOptions object.
*
* @author Peter-Josef Meisch
* @since 3.2
*/
public class BulkOptions {
private static final BulkOptions defaultOptions = builder().build();
private final TimeValue timeout;
private final WriteRequest.RefreshPolicy refreshPolicy;
private final ActiveShardCount waitForActiveShards;
private final String pipeline;
private final String routingId;
public static BulkOptionsBuilder builder() {
return new BulkOptionsBuilder();
}
public static BulkOptions defaultOptions() {
return defaultOptions;
}
private BulkOptions(TimeValue timeout, WriteRequest.RefreshPolicy refreshPolicy, ActiveShardCount waitForActiveShards,
String pipeline, String routingId) {
this.timeout = timeout;
this.refreshPolicy = refreshPolicy;
this.waitForActiveShards = waitForActiveShards;
this.pipeline = pipeline;
this.routingId = routingId;
}
public TimeValue getTimeout() {
return timeout;
}
public WriteRequest.RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
public ActiveShardCount getWaitForActiveShards() {
return waitForActiveShards;
}
public String getPipeline() {
return pipeline;
}
public String getRoutingId() {
return routingId;
}
public static final class BulkOptionsBuilder {
private TimeValue timeout;
private WriteRequest.RefreshPolicy refreshPolicy;
private ActiveShardCount waitForActiveShards;
private String pipeline;
private String routingId;
private BulkOptionsBuilder() {}
public BulkOptionsBuilder withTimeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
public BulkOptionsBuilder withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
return this;
}
public BulkOptionsBuilder withWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
public BulkOptionsBuilder withPipeline(String pipeline) {
this.pipeline = pipeline;
return this;
}
public BulkOptionsBuilder withRoutingId(String routingId) {
this.routingId = routingId;
return this;
}
public BulkOptions build() {
return new BulkOptions(timeout, refreshPolicy, waitForActiveShards, pipeline, routingId);
}
}
}