diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 7c5803e19c3..12f1d7b5a6e 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -681,6 +681,7 @@ + @@ -799,7 +800,6 @@ - @@ -1025,7 +1025,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java b/core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java deleted file mode 100644 index 0813e85960f..00000000000 --- a/core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action; - - -/** - * Write Consistency Level control how many replicas should be active for a write operation to occur (a write operation - * can be index, or delete). - * - * - */ -public enum WriteConsistencyLevel { - DEFAULT((byte) 0), - ONE((byte) 1), - QUORUM((byte) 2), - ALL((byte) 3); - - private final byte id; - - WriteConsistencyLevel(byte id) { - this.id = id; - } - - public byte id() { - return id; - } - - public static WriteConsistencyLevel fromId(byte value) { - if (value == 0) { - return DEFAULT; - } else if (value == 1) { - return ONE; - } else if (value == 2) { - return QUORUM; - } else if (value == 3) { - return ALL; - } - throw new IllegalArgumentException("No write consistency match [" + value + "]"); - } - - public static WriteConsistencyLevel fromString(String value) { - if (value.equals("default")) { - return DEFAULT; - } else if (value.equals("one")) { - return ONE; - } else if (value.equals("quorum")) { - return QUORUM; - } else if (value.equals("all")) { - return ALL; - } - throw new IllegalArgumentException("No write consistency match [" + value + "]"); - } -} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 82fb6d70ca4..a6213c4f925 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -70,7 +70,7 @@ public class TransportShardFlushAction extends TransportReplicationAction implements Composite List payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; - private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; + private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT; private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; private long sizeInBytes = 0; @@ -432,15 +433,16 @@ public class BulkRequest extends ActionRequest implements Composite } /** - * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} + * Sets the number of shard copies that must be active before proceeding with the write. + * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. */ - public BulkRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) { - this.consistencyLevel = consistencyLevel; + public BulkRequest waitForActiveShards(ActiveShardCount activeShardCount) { + this.activeShardCount = activeShardCount; return this; } - public WriteConsistencyLevel consistencyLevel() { - return this.consistencyLevel; + public ActiveShardCount waitForActiveShards() { + return this.activeShardCount; } @Override @@ -525,7 +527,7 @@ public class BulkRequest extends ActionRequest implements Composite @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); + activeShardCount = ActiveShardCount.readFrom(in); int size = in.readVInt(); for (int i = 0; i < size; i++) { byte type = in.readByte(); @@ -550,7 +552,7 @@ public class BulkRequest extends ActionRequest implements Composite @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(consistencyLevel.id()); + activeShardCount.writeTo(out); out.writeVInt(requests.size()); for (ActionRequest request : requests) { if (request instanceof IndexRequest) { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java index 4f2b7aa702e..cd479d2c89f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java @@ -20,12 +20,13 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; @@ -111,10 +112,11 @@ public class BulkRequestBuilder extends ActionRequestBuilder requests = entry.getValue(); BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), requests.toArray(new BulkItemRequest[requests.size()])); - bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel()); + bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); bulkShardRequest.timeout(bulkRequest.timeout()); if (task != null) { bulkShardRequest.setParentTask(nodeId, task.getId()); diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java index 87d45f6ccd1..cf7d2cf1e54 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java @@ -163,7 +163,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } else { BulkRequest modifiedBulkRequest = new BulkRequest(); modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy()); - modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel()); + modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); modifiedBulkRequest.timeout(bulkRequest.timeout()); int slot = 0; diff --git a/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java index 90bd0450afb..deccc841a5c 100644 --- a/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java +++ b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java @@ -155,7 +155,7 @@ public final class ActiveShardCount implements Writeable { return false; } for (final IntObjectCursor shardRouting : indexRoutingTable.getShards()) { - if (enoughShardsActive(shardRouting.value, indexMetaData) == false) { + if (enoughShardsActive(shardRouting.value, indexMetaData).isEnoughShardsActive() == false) { // not enough active shard copies yet return false; } @@ -167,12 +167,10 @@ public final class ActiveShardCount implements Writeable { * Returns true iff the active shard count in the shard routing table is enough * to meet the required shard count represented by this instance. */ - public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) { - if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) { - // not enough active shard copies yet - return false; - } - return true; + public EvalResult enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) { + final int totalActive = shardRoutingTable.activeShards().size(); + final int totalRequired = resolve(indexMetaData); + return new EvalResult(shardRoutingTable.activeShards().size() >= resolve(indexMetaData), totalActive, totalRequired); } @Override @@ -194,18 +192,41 @@ public final class ActiveShardCount implements Writeable { @Override public String toString() { - final String valStr; switch (value) { case ALL_ACTIVE_SHARDS: - valStr = "ALL"; - break; + return "ALL"; case ACTIVE_SHARD_COUNT_DEFAULT: - valStr = "DEFAULT"; - break; + return "DEFAULT"; default: - valStr = Integer.toString(value); + return Integer.toString(value); + } + } + + /** + * The result of the evaluation of the active shard copy count against a shard routing table. + */ + public static final class EvalResult { + private final boolean enoughShardsActive; + private final int totalActive; + private final int totalRequired; + + private EvalResult(boolean enoughShardsActive, int totalActive, int totalRequired) { + this.enoughShardsActive = enoughShardsActive; + this.totalActive = totalActive; + this.totalRequired = totalRequired; + } + + public boolean isEnoughShardsActive() { + return enoughShardsActive; + } + + public int getTotalActive() { + return totalActive; + } + + public int getTotalRequired() { + return totalRequired; } - return "ActiveShardCount[" + valStr + "]"; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index dc7846a74de..253eace4fd2 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -22,9 +22,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -68,7 +69,7 @@ public class ReplicationOperation< private final AtomicInteger pendingShards = new AtomicInteger(); private final AtomicInteger successfulShards = new AtomicInteger(); private final boolean executeOnReplicas; - private final boolean checkWriteConsistency; + private final boolean checkActiveShardCount; private final Primary primary; private final Replicas replicasProxy; private final AtomicBoolean finished = new AtomicBoolean(); @@ -80,10 +81,10 @@ public class ReplicationOperation< public ReplicationOperation(Request request, Primary primary, ActionListener listener, - boolean executeOnReplicas, boolean checkWriteConsistency, + boolean executeOnReplicas, boolean checkActiveShardCount, Replicas replicas, Supplier clusterStateSupplier, ESLogger logger, String opType) { - this.checkWriteConsistency = checkWriteConsistency; + this.checkActiveShardCount = checkActiveShardCount; this.executeOnReplicas = executeOnReplicas; this.replicasProxy = replicas; this.primary = primary; @@ -95,12 +96,12 @@ public class ReplicationOperation< } public void execute() throws Exception { - final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null; + final String activeShardCountFailure = checkActiveShardCount ? checkActiveShardCount() : null; final ShardRouting primaryRouting = primary.routingEntry(); final ShardId primaryId = primaryRouting.shardId(); - if (writeConsistencyFailure != null) { + if (activeShardCountFailure != null) { finishAsFailed(new UnavailableShardsException(primaryId, - "{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request)); + "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request)); return; } @@ -190,46 +191,35 @@ public class ReplicationOperation< } /** - * checks whether we can perform a write based on the write consistency setting - * returns **null* if OK to proceed, or a string describing the reason to stop + * Checks whether we can perform a write based on the required active shard count setting. + * Returns **null* if OK to proceed, or a string describing the reason to stop */ - String checkWriteConsistency() { - assert request.consistencyLevel() != WriteConsistencyLevel.DEFAULT : "consistency level should be set"; + String checkActiveShardCount() { final ShardId shardId = primary.routingEntry().shardId(); + final String indexName = shardId.getIndexName(); final ClusterState state = clusterStateSupplier.get(); - final WriteConsistencyLevel consistencyLevel = request.consistencyLevel(); - final int sizeActive; - final int requiredNumber; - IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName()); - if (indexRoutingTable != null) { - IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId()); - if (shardRoutingTable != null) { - sizeActive = shardRoutingTable.activeShards().size(); - if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { - // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, - // quorum is 1 (which is what it is initialized to) - requiredNumber = (shardRoutingTable.getSize() / 2) + 1; - } else if (consistencyLevel == WriteConsistencyLevel.ALL) { - requiredNumber = shardRoutingTable.getSize(); - } else { - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; + final ActiveShardCount waitForActiveShards = request.waitForActiveShards(); + IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexName); + if (indexRoutingTable == null) { + logger.trace("[{}] index not found in the routing table", shardId); + return "Index " + indexName + " not found in the routing table"; } + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId()); + if (shardRoutingTable == null) { + logger.trace("[{}] shard not found in the routing table", shardId); + return "Shard " + shardId + " not found in the routing table"; + } + IndexMetaData indexMetaData = state.getMetaData().index(indexName); + assert indexMetaData != null; + ActiveShardCount.EvalResult result = waitForActiveShards.enoughShardsActive(shardRoutingTable, indexMetaData); - if (sizeActive < requiredNumber) { - logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." + - " op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request); - return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " - + requiredNumber + ")."; - } else { + if (result.isEnoughShardsActive()) { return null; + } else { + logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " + + "request [{}]", shardId, waitForActiveShards, result.getTotalActive(), result.getTotalRequired(), opType, request); + return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " + result.getTotalActive() + + ", needed " + result.getTotalRequired() + ")."; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index adb44dd4964..1de10675f96 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -22,9 +22,9 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -60,7 +60,10 @@ public abstract class ReplicationRequest listener, PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { return new ReplicationOperation<>(request, primaryShardReference, listener, - executeOnReplicas, checkWriteConsistency(), replicasProxy, clusterService::state, logger, actionName + executeOnReplicas, checkActiveShardCount(), replicasProxy, clusterService::state, logger, actionName ); } } @@ -566,8 +567,8 @@ public abstract class TransportReplicationAction< } // resolve all derived request fields, so we can route and apply it - if (request.consistencyLevel() == WriteConsistencyLevel.DEFAULT) { - request.consistencyLevel(defaultWriteConsistencyLevel); + if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) { + request.waitForActiveShards(defaultWaitForActiveShards); } resolveRequest(state.metaData(), indexMetaData, request); assert request.shardId() != null : "request shardId must be set in resolveRequest"; diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 03600461599..919a3cb90b4 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -131,7 +131,7 @@ public class UpdateHelper extends AbstractComponent { .setRefreshPolicy(request.getRefreshPolicy()) .routing(request.routing()) .parent(request.parent()) - .consistencyLevel(request.consistencyLevel()); + .waitForActiveShards(request.waitForActiveShards()); if (request.versionType() != VersionType.INTERNAL) { // in all but the internal versioning mode, we want to create the new document using the given version. indexRequest.version(request.version()).versionType(request.versionType()); @@ -224,14 +224,14 @@ public class UpdateHelper extends AbstractComponent { final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) .version(updateVersion).versionType(request.versionType()) - .consistencyLevel(request.consistencyLevel()) + .waitForActiveShards(request.waitForActiveShards()) .timestamp(timestamp).ttl(ttl) .setRefreshPolicy(request.getRefreshPolicy()); return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } else if ("delete".equals(operation)) { DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .version(updateVersion).versionType(request.versionType()) - .consistencyLevel(request.consistencyLevel()) + .waitForActiveShards(request.waitForActiveShards()) .setRefreshPolicy(request.getRefreshPolicy()); return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 43218dc44fa..84c6d75c908 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -21,9 +21,10 @@ package org.elasticsearch.action.update; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocumentRequest; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; @@ -74,7 +75,7 @@ public class UpdateRequest extends InstanceShardOperationRequest private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; - private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; + private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT; private IndexRequest upsertRequest; @@ -433,15 +434,16 @@ public class UpdateRequest extends InstanceShardOperationRequest return refreshPolicy; } - public WriteConsistencyLevel consistencyLevel() { - return this.consistencyLevel; + public ActiveShardCount waitForActiveShards() { + return this.activeShardCount; } /** - * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} + * Sets the number of shard copies that must be active before proceeding with the write. + * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. */ - public UpdateRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) { - this.consistencyLevel = consistencyLevel; + public UpdateRequest waitForActiveShards(ActiveShardCount activeShardCount) { + this.activeShardCount = activeShardCount; return this; } @@ -703,7 +705,7 @@ public class UpdateRequest extends InstanceShardOperationRequest @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); + activeShardCount = ActiveShardCount.readFrom(in); type = in.readString(); id = in.readString(); routing = in.readOptionalString(); @@ -738,7 +740,7 @@ public class UpdateRequest extends InstanceShardOperationRequest @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(consistencyLevel.id()); + activeShardCount.writeTo(out); out.writeString(type); out.writeString(id); out.writeOptionalString(routing); diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index 403f4265fcd..90f066f0a7e 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -19,9 +19,10 @@ package org.elasticsearch.action.update; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.bytes.BytesReference; @@ -122,10 +123,11 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder { Map groups = s.getAsGroups(); diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index bbbe3b80cd3..82d1414e347 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -20,6 +20,7 @@ package org.elasticsearch.index; import org.apache.lucene.index.MergePolicy; import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.logging.ESLogger; @@ -128,6 +129,16 @@ public final class IndexSettings { public static final Setting MAX_SLICES_PER_SCROLL = Setting.intSetting("index.max_slices_per_scroll", 1024, 1, Property.Dynamic, Property.IndexScope); + /** + * The number of active shard copies required for a write operation. + */ + public static final Setting WAIT_FOR_ACTIVE_SHARDS_SETTING = + new Setting<>("index.write.wait_for_active_shards", + "1", + ActiveShardCount::parseString, + Setting.Property.Dynamic, + Setting.Property.IndexScope); + private final Index index; private final Version version; private final ESLogger logger; diff --git a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index 623af6d2f47..a5a9ec40344 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -19,11 +19,11 @@ package org.elasticsearch.rest.action.bulk; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; @@ -79,9 +79,9 @@ public class RestBulkAction extends BaseRestHandler { String defaultPipeline = request.param("pipeline"); String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null; - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); diff --git a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 2f9b10096cc..869dca8dce0 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -19,8 +19,8 @@ package org.elasticsearch.rest.action.delete; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -55,9 +55,9 @@ public class RestDeleteAction extends BaseRestHandler { deleteRequest.version(RestActions.parseVersion(request)); deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType())); - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + deleteRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } client.delete(deleteRequest, new RestStatusToXContentListener<>(channel)); diff --git a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index a4222adacd7..ef5f02d4b16 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -19,8 +19,8 @@ package org.elasticsearch.rest.action.index; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -98,9 +98,9 @@ public class RestIndexAction extends BaseRestHandler { } } } - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing()))); } diff --git a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index cfe26d35c56..5fbdd29f963 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -19,8 +19,8 @@ package org.elasticsearch.rest.action.update; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.Strings; @@ -53,9 +53,9 @@ public class RestUpdateAction extends BaseRestHandler { updateRequest.parent(request.param("parent")); updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout())); updateRequest.setRefreshPolicy(request.param("refresh")); - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + updateRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } updateRequest.docAsUpsert(request.paramAsBoolean("doc_as_upsert", updateRequest.docAsUpsert())); String sField = request.param("fields"); diff --git a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java similarity index 72% rename from core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java rename to core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java index 067f7e11530..13b21ff4280 100644 --- a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java @@ -17,10 +17,9 @@ * under the License. */ -package org.elasticsearch.consistencylevel; +package org.elasticsearch.action.support; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -35,26 +34,26 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.hamcrest.Matchers.equalTo; /** - * + * Tests setting the active shard count for replication operations (e.g. index) operates correctly. */ -public class WriteConsistencyLevelIT extends ESIntegTestCase { - public void testWriteConsistencyLevelReplication2() throws Exception { +public class WaitActiveShardCountIT extends ESIntegTestCase { + public void testReplicationWaitsForActiveShardCount() throws Exception { CreateIndexResponse createIndexResponse = - prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2)) - .get(); + prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2)).get(); assertAcked(createIndexResponse); - // indexing, by default, will work (ONE consistency level) - client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet(); + // indexing, by default, will work (waiting for one shard copy only) + client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).execute().actionGet(); + ActiveShardCount activeShardCount = ActiveShardCount.from(2); // wait for two active shard copies try { client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.QUORUM) + .setWaitForActiveShards(activeShardCount) .setTimeout(timeValueMillis(100)).execute().actionGet(); - fail("can't index, does not match consistency"); + fail("can't index, does not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + activeShardCount + "] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); // but really, all is well } @@ -71,19 +70,19 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase { assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - // this should work, since we now have + // this should work, since we now have two client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.QUORUM) + .setWaitForActiveShards(activeShardCount) .setTimeout(timeValueSeconds(1)).execute().actionGet(); try { client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.ALL) + .setWaitForActiveShards(ActiveShardCount.ALL) .setTimeout(timeValueMillis(100)).execute().actionGet(); - fail("can't index, does not match consistency"); + fail("can't index, not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); // but really, all is well } @@ -93,9 +92,9 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase { assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - // this should work, since we now have + // this should work, since we now have all shards started client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.ALL) + .setWaitForActiveShards(ActiveShardCount.ALL) .setTimeout(timeValueSeconds(1)).execute().actionGet(); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 9f41f0e37c2..ed43315a414 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -20,12 +20,14 @@ package org.elasticsearch.action.support.replication; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -33,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.IndexShardNotStartedException; import org.elasticsearch.index.shard.IndexShardState; @@ -251,34 +254,20 @@ public class ReplicationOperationTests extends ESTestCase { assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); } - public void testWriteConsistency() throws Exception { + public void testWaitForActiveShards() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); final int assignedReplicas = randomInt(2); final int unassignedReplicas = randomInt(2); final int totalShards = 1 + assignedReplicas + unassignedReplicas; - final boolean passesWriteConsistency; - Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values())); - switch (request.consistencyLevel()) { - case ONE: - passesWriteConsistency = true; - break; - case DEFAULT: - case QUORUM: - if (totalShards <= 2) { - passesWriteConsistency = true; // primary is enough - } else { - passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1; - } - // we have to reset default (as the transport replication action will do) - request.consistencyLevel(WriteConsistencyLevel.QUORUM); - break; - case ALL: - passesWriteConsistency = unassignedReplicas == 0; - break; - default: - throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]"); - } + final IndexMetaData indexMetaData = IndexMetaData.builder(index) + .settings(Settings.builder().put("index.version.created", Version.CURRENT.id)) + .numberOfReplicas(assignedReplicas + unassignedReplicas) + .numberOfShards(randomIntBetween(1, 5)) + .build(); + Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(randomIntBetween(0, totalShards))); + final boolean passesActiveShardCheck = request.waitForActiveShards().resolve(indexMetaData) <= assignedReplicas + 1; + ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; for (int i = 0; i < assignedReplicas; i++) { replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); @@ -288,10 +277,10 @@ public class ReplicationOperationTests extends ESTestCase { } final ClusterState state = state(index, true, ShardRoutingState.STARTED, replicaStates); - logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]." + + logger.debug("using active shard count of [{}], assigned shards [{}], total shards [{}]." + " expecting op to [{}]. using state: \n{}", - request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, - passesWriteConsistency ? "succeed" : "retry", + request.waitForActiveShards(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, + passesActiveShardCheck ? "succeed" : "retry", state.prettyPrint()); final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id()); final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); @@ -301,15 +290,15 @@ public class ReplicationOperationTests extends ESTestCase { new TestPrimary(primaryShard, primaryTerm), listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test"); - if (passesWriteConsistency) { - assertThat(op.checkWriteConsistency(), nullValue()); + if (passesActiveShardCheck) { + assertThat(op.checkActiveShardCount(), nullValue()); op.execute(); - assertTrue("operations should have been performed, consistency level is met", + assertTrue("operations should have been performed, active shard count is met", request.processedOnPrimary.get()); } else { - assertThat(op.checkWriteConsistency(), notNullValue()); + assertThat(op.checkActiveShardCount(), notNullValue()); op.execute(); - assertFalse("operations should not have been perform, consistency level is *NOT* met", + assertFalse("operations should not have been perform, active shard count is *NOT* met", request.processedOnPrimary.get()); assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class); } @@ -462,9 +451,9 @@ public class ReplicationOperationTests extends ESTestCase { } public TestReplicationOperation(Request request, Primary primary, - ActionListener listener, boolean executeOnReplicas, boolean checkWriteConsistency, + ActionListener listener, boolean executeOnReplicas, boolean checkActiveShardCount, Replicas replicas, Supplier clusterStateSupplier, ESLogger logger, String opType) { - super(request, primary, listener, executeOnReplicas, checkWriteConsistency, replicas, clusterStateSupplier, logger, opType); + super(request, primary, listener, executeOnReplicas, checkActiveShardCount, replicas, clusterStateSupplier, logger, opType); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index de2ddabb0fe..ef8ee623939 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -766,7 +766,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected boolean checkWriteConsistency() { + protected boolean checkActiveShardCount() { return false; } diff --git a/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java b/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java index 53ac2bc045a..312a01497d3 100644 --- a/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java +++ b/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java @@ -54,11 +54,8 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase { private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas) throws Exception { - // make sure we have enough nodes to guaranty default QUORUM consistency. - // TODO: add a smarter choice based on actual consistency (when that is randomized) - int shardsNo = numberOfReplicas + 1; - int neededNodes = shardsNo <= 2 ? 1 : shardsNo / 2 + 1; - internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(neededNodes, shardsNo)); + // TODO: add a smarter choice based on varying active shard count (when that is randomized) + internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(1, numberOfReplicas + 1)); String id = randomAsciiOfLength(5); // we will go the primary or the replica, but in a diff --git a/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java b/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java index bacc6d791d4..76cfa610a6d 100644 --- a/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java +++ b/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.search.basic; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -118,7 +117,7 @@ public class TransportSearchFailuresIT extends ESIntegTestCase { } private void index(Client client, String id, String nameValue, int age) throws IOException { - client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age)).consistencyLevel(WriteConsistencyLevel.ONE)).actionGet(); + client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age))).actionGet(); } private XContentBuilder source(String id, String nameValue, int age) throws IOException { diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 39e26f382ca..2489442376b 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -690,7 +690,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621") public void testChaosSnapshot() throws Exception { final List indices = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("action.write_consistency", "one").build(); + Settings settings = Settings.builder().put(IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING.getKey(), "1").build(); int initialNodes = between(1, 3); logger.info("--> start {} nodes", initialNodes); for (int i = 0; i < initialNodes; i++) { diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index ce1e504fab3..0178d2e1fb6 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -215,7 +215,7 @@ public abstract class AbstractAsyncBulkByScrollAction