DATAES-603 - Polishing.

Add nullable annotations to nullable properties in BulkOptions. Add assertions to non-nullable method arguments. Reformat code. Make methods static where possible.

Original pull request: #296.
This commit is contained in:
Mark Paluch 2019-07-26 12:03:12 +02:00
parent 8c7d5d47b1
commit e23f6f1d33
3 changed files with 62 additions and 35 deletions

View File

@ -741,6 +741,10 @@ public class ElasticsearchRestTemplate
@Override
public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions) {
Assert.notNull(queries, "List of IndexQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
BulkRequest bulkRequest = new BulkRequest();
setBulkOptions(bulkRequest, bulkOptions);
for (IndexQuery query : queries) {
@ -755,6 +759,10 @@ public class ElasticsearchRestTemplate
@Override
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions) {
Assert.notNull(queries, "List of UpdateQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
BulkRequest bulkRequest = new BulkRequest();
setBulkOptions(bulkRequest, bulkOptions);
for (UpdateQuery query : queries) {
@ -767,30 +775,25 @@ public class ElasticsearchRestTemplate
}
}
private void setBulkOptions(BulkRequest bulkRequest, BulkOptions bulkOptions) {
private static void setBulkOptions(BulkRequest bulkRequest, BulkOptions bulkOptions) {
if (bulkOptions.getTimeout() != null) {
bulkRequest.timeout(bulkOptions.getTimeout());
}
if (bulkOptions.getRefreshPolicy() != null) {
bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy());
}
if (bulkOptions.getWaitForActiveShards() != null) {
bulkRequest.waitForActiveShards(bulkOptions.getWaitForActiveShards());
}
if (bulkOptions.getPipeline() != null) {
bulkRequest.pipeline(bulkOptions.getPipeline());
}
if (bulkOptions.getRoutingId() != null) {
bulkRequest.routing(bulkOptions.getRoutingId());
}
}

View File

@ -635,7 +635,11 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, EsClient<
}
@Override
public void bulkIndex(List<IndexQuery> queries, @Nullable BulkOptions bulkOptions) {
public void bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions) {
Assert.notNull(queries, "List of IndexQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
BulkRequestBuilder bulkRequest = client.prepareBulk();
setBulkOptions(bulkRequest, bulkOptions);
for (IndexQuery query : queries) {
@ -645,7 +649,11 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, EsClient<
}
@Override
public void bulkUpdate(List<UpdateQuery> queries, @Nullable BulkOptions bulkOptions) {
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions) {
Assert.notNull(queries, "List of UpdateQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
BulkRequestBuilder bulkRequest = client.prepareBulk();
setBulkOptions(bulkRequest, bulkOptions);
for (UpdateQuery query : queries) {
@ -654,30 +662,25 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, EsClient<
checkForBulkUpdateFailure(bulkRequest.execute().actionGet());
}
private void setBulkOptions(BulkRequestBuilder bulkRequest, BulkOptions bulkOptions) {
private static void setBulkOptions(BulkRequestBuilder bulkRequest, BulkOptions bulkOptions) {
if (bulkOptions.getTimeout() != null) {
bulkRequest.setTimeout(bulkOptions.getTimeout());
}
if (bulkOptions.getRefreshPolicy() != null) {
bulkRequest.setRefreshPolicy(bulkOptions.getRefreshPolicy());
}
if (bulkOptions.getWaitForActiveShards() != null) {
bulkRequest.setWaitForActiveShards(bulkOptions.getWaitForActiveShards());
}
if (bulkOptions.getPipeline() != null) {
bulkRequest.pipeline(bulkOptions.getPipeline());
}
if (bulkOptions.getRoutingId() != null) {
bulkRequest.routing(bulkOptions.getRoutingId());
}
}

View File

@ -19,6 +19,7 @@ import java.util.List;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
/**
@ -29,28 +30,21 @@ import org.elasticsearch.common.unit.TimeValue;
* {@link BulkOptionsBuilder#build()} to get the BulkOptions object.
*
* @author Peter-Josef Meisch
* @author Mark Paluch
* @since 3.2
*/
public class BulkOptions {
private static final BulkOptions defaultOptions = builder().build();
private final TimeValue timeout;
private final WriteRequest.RefreshPolicy refreshPolicy;
private final ActiveShardCount waitForActiveShards;
private final String pipeline;
private final String routingId;
private final @Nullable TimeValue timeout;
private final @Nullable WriteRequest.RefreshPolicy refreshPolicy;
private final @Nullable ActiveShardCount waitForActiveShards;
private final @Nullable String pipeline;
private final @Nullable String routingId;
public static BulkOptionsBuilder builder() {
return new BulkOptionsBuilder();
}
public static BulkOptions defaultOptions() {
return defaultOptions;
}
private BulkOptions(TimeValue timeout, WriteRequest.RefreshPolicy refreshPolicy, ActiveShardCount waitForActiveShards,
String pipeline, String routingId) {
private BulkOptions(@Nullable TimeValue timeout, @Nullable WriteRequest.RefreshPolicy refreshPolicy,
@Nullable ActiveShardCount waitForActiveShards, @Nullable String pipeline, @Nullable String routingId) {
this.timeout = timeout;
this.refreshPolicy = refreshPolicy;
this.waitForActiveShards = waitForActiveShards;
@ -58,32 +52,59 @@ public class BulkOptions {
this.routingId = routingId;
}
@Nullable
public TimeValue getTimeout() {
return timeout;
}
@Nullable
public WriteRequest.RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
@Nullable
public ActiveShardCount getWaitForActiveShards() {
return waitForActiveShards;
}
@Nullable
public String getPipeline() {
return pipeline;
}
@Nullable
public String getRoutingId() {
return routingId;
}
public static final class BulkOptionsBuilder {
private TimeValue timeout;
private WriteRequest.RefreshPolicy refreshPolicy;
private ActiveShardCount waitForActiveShards;
private String pipeline;
private String routingId;
/**
* Create a new {@link BulkOptionsBuilder} to build {@link BulkOptions}.
*
* @return a new {@link BulkOptionsBuilder} to build {@link BulkOptions}.
*/
public static BulkOptionsBuilder builder() {
return new BulkOptionsBuilder();
}
/**
* Return default {@link BulkOptions}.
*
* @return default {@link BulkOptions}.
*/
public static BulkOptions defaultOptions() {
return defaultOptions;
}
/**
* Builder for {@link BulkOptions}.
*/
public static class BulkOptionsBuilder {
private @Nullable TimeValue timeout;
private @Nullable WriteRequest.RefreshPolicy refreshPolicy;
private @Nullable ActiveShardCount waitForActiveShards;
private @Nullable String pipeline;
private @Nullable String routingId;
private BulkOptionsBuilder() {}