Removes the notion of write consistency level across all APIs in

favor of waiting for active shard copy count (wait_for_active_shards).
This commit is contained in:
Ali Beyad 2016-07-15 15:41:34 -04:00
parent 9f88a8194a
commit 25d8eca62d
44 changed files with 277 additions and 328 deletions

View File

@ -681,6 +681,7 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]AutoCreateIndexTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptionsTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]TransportActionFilterChainTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
@ -799,7 +800,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]cbor[/\\]JsonVsCborTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]smile[/\\]JsonVsSmileTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]support[/\\]filtering[/\\]FilterPathGeneratorFilteringTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]consistencylevel[/\\]WriteConsistencyLevelIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]deps[/\\]joda[/\\]SimpleJodaTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
@ -1025,7 +1025,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWhileRelocatingIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomExceptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomIOExceptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportSearchFailuresIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportTwoNodesSearchIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ChildQuerySearchIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ParentFieldLoadingIT.java" checks="LineLength" />

View File

@ -1,70 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
/**
* Write Consistency Level control how many replicas should be active for a write operation to occur (a write operation
* can be index, or delete).
*
*
*/
public enum WriteConsistencyLevel {
DEFAULT((byte) 0),
ONE((byte) 1),
QUORUM((byte) 2),
ALL((byte) 3);
private final byte id;
WriteConsistencyLevel(byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static WriteConsistencyLevel fromId(byte value) {
if (value == 0) {
return DEFAULT;
} else if (value == 1) {
return ONE;
} else if (value == 2) {
return QUORUM;
} else if (value == 3) {
return ALL;
}
throw new IllegalArgumentException("No write consistency match [" + value + "]");
}
public static WriteConsistencyLevel fromString(String value) {
if (value.equals("default")) {
return DEFAULT;
} else if (value.equals("one")) {
return ONE;
} else if (value.equals("quorum")) {
return QUORUM;
} else if (value.equals("all")) {
return ALL;
}
throw new IllegalArgumentException("No write consistency match [" + value + "]");
}
}

View File

@ -70,7 +70,7 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
}
@Override
protected boolean checkWriteConsistency() {
protected boolean checkActiveShardCount() {
return false;
}

View File

@ -71,7 +71,7 @@ public class TransportShardRefreshAction
}
@Override
protected boolean checkWriteConsistency() {
protected boolean checkActiveShardCount() {
return false;
}

View File

@ -23,10 +23,11 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -68,7 +69,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
List<Object> payloads = null;
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT;
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private long sizeInBytes = 0;
@ -432,15 +433,16 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public BulkRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
public BulkRequest waitForActiveShards(ActiveShardCount activeShardCount) {
this.activeShardCount = activeShardCount;
return this;
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
public ActiveShardCount waitForActiveShards() {
return this.activeShardCount;
}
@Override
@ -525,7 +527,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
activeShardCount = ActiveShardCount.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
byte type = in.readByte();
@ -550,7 +552,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(consistencyLevel.id());
activeShardCount.writeTo(out);
out.writeVInt(requests.size());
for (ActionRequest<?> request : requests) {
if (request instanceof IndexRequest) {

View File

@ -20,12 +20,13 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
@ -111,10 +112,11 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
}
/**
* Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}.
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public BulkRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);
public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount activeShardCount) {
request.waitForActiveShards(activeShardCount);
return this;
}

View File

@ -339,7 +339,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());

View File

@ -163,7 +163,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
} else {
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel());
modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
modifiedBulkRequest.timeout(bulkRequest.timeout());
int slot = 0;

View File

@ -155,7 +155,7 @@ public final class ActiveShardCount implements Writeable {
return false;
}
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
if (enoughShardsActive(shardRouting.value, indexMetaData) == false) {
if (enoughShardsActive(shardRouting.value, indexMetaData).isEnoughShardsActive() == false) {
// not enough active shard copies yet
return false;
}
@ -167,12 +167,10 @@ public final class ActiveShardCount implements Writeable {
* Returns true iff the active shard count in the shard routing table is enough
* to meet the required shard count represented by this instance.
*/
public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) {
if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) {
// not enough active shard copies yet
return false;
}
return true;
public EvalResult enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) {
final int totalActive = shardRoutingTable.activeShards().size();
final int totalRequired = resolve(indexMetaData);
return new EvalResult(shardRoutingTable.activeShards().size() >= resolve(indexMetaData), totalActive, totalRequired);
}
@Override
@ -194,18 +192,41 @@ public final class ActiveShardCount implements Writeable {
@Override
public String toString() {
final String valStr;
switch (value) {
case ALL_ACTIVE_SHARDS:
valStr = "ALL";
break;
return "ALL";
case ACTIVE_SHARD_COUNT_DEFAULT:
valStr = "DEFAULT";
break;
return "DEFAULT";
default:
valStr = Integer.toString(value);
return Integer.toString(value);
}
}
/**
* The result of the evaluation of the active shard copy count against a shard routing table.
*/
public static final class EvalResult {
private final boolean enoughShardsActive;
private final int totalActive;
private final int totalRequired;
private EvalResult(boolean enoughShardsActive, int totalActive, int totalRequired) {
this.enoughShardsActive = enoughShardsActive;
this.totalActive = totalActive;
this.totalRequired = totalRequired;
}
public boolean isEnoughShardsActive() {
return enoughShardsActive;
}
public int getTotalActive() {
return totalActive;
}
public int getTotalRequired() {
return totalRequired;
}
return "ActiveShardCount[" + valStr + "]";
}
}

View File

@ -22,9 +22,10 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -68,7 +69,7 @@ public class ReplicationOperation<
private final AtomicInteger pendingShards = new AtomicInteger();
private final AtomicInteger successfulShards = new AtomicInteger();
private final boolean executeOnReplicas;
private final boolean checkWriteConsistency;
private final boolean checkActiveShardCount;
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
private final Replicas<ReplicaRequest> replicasProxy;
private final AtomicBoolean finished = new AtomicBoolean();
@ -80,10 +81,10 @@ public class ReplicationOperation<
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
boolean executeOnReplicas, boolean checkWriteConsistency,
boolean executeOnReplicas, boolean checkActiveShardCount,
Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
this.checkWriteConsistency = checkWriteConsistency;
this.checkActiveShardCount = checkActiveShardCount;
this.executeOnReplicas = executeOnReplicas;
this.replicasProxy = replicas;
this.primary = primary;
@ -95,12 +96,12 @@ public class ReplicationOperation<
}
public void execute() throws Exception {
final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null;
final String activeShardCountFailure = checkActiveShardCount ? checkActiveShardCount() : null;
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (writeConsistencyFailure != null) {
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request));
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}
@ -190,46 +191,35 @@ public class ReplicationOperation<
}
/**
* checks whether we can perform a write based on the write consistency setting
* returns **null* if OK to proceed, or a string describing the reason to stop
* Checks whether we can perform a write based on the required active shard count setting.
* Returns **null* if OK to proceed, or a string describing the reason to stop
*/
String checkWriteConsistency() {
assert request.consistencyLevel() != WriteConsistencyLevel.DEFAULT : "consistency level should be set";
String checkActiveShardCount() {
final ShardId shardId = primary.routingEntry().shardId();
final String indexName = shardId.getIndexName();
final ClusterState state = clusterStateSupplier.get();
final WriteConsistencyLevel consistencyLevel = request.consistencyLevel();
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica,
// quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardRoutingTable.getSize();
} else {
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
final ActiveShardCount waitForActiveShards = request.waitForActiveShards();
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexName);
if (indexRoutingTable == null) {
logger.trace("[{}] index not found in the routing table", shardId);
return "Index " + indexName + " not found in the routing table";
}
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
if (shardRoutingTable == null) {
logger.trace("[{}] shard not found in the routing table", shardId);
return "Shard " + shardId + " not found in the routing table";
}
IndexMetaData indexMetaData = state.getMetaData().index(indexName);
assert indexMetaData != null;
ActiveShardCount.EvalResult result = waitForActiveShards.enoughShardsActive(shardRoutingTable, indexMetaData);
if (sizeActive < requiredNumber) {
logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." +
" op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request);
return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed "
+ requiredNumber + ").";
} else {
if (result.isEnoughShardsActive()) {
return null;
} else {
logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " +
"request [{}]", shardId, waitForActiveShards, result.getTotalActive(), result.getTotalRequired(), opType, request);
return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " + result.getTotalActive() +
", needed " + result.getTotalRequired() + ").";
}
}

View File

@ -22,9 +22,9 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -60,7 +60,10 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
/**
* The number of shard copies that must be active before proceeding with the replication action.
*/
private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT;
private long routedBasedOnClusterVersion = 0;
@ -116,8 +119,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
public ActiveShardCount waitForActiveShards() {
return this.activeShardCount;
}
/**
@ -130,11 +133,16 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
* Sets the number of shard copies that must be active before proceeding with the replication
* operation. Defaults to {@link ActiveShardCount#DEFAULT}, which requires one shard copy
* (the primary) to be active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active. Otherwise, use
* {@link ActiveShardCount#from(int)} to set this value to any non-negative integer, up to the
* total number of shard copies (number of replicas + 1).
*/
@SuppressWarnings("unchecked")
public final Request consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
public final Request waitForActiveShards(ActiveShardCount activeShardCount) {
this.activeShardCount = activeShardCount;
return (Request) this;
}
@ -179,7 +187,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
} else {
shardId = null;
}
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
activeShardCount = ActiveShardCount.readFrom(in);
timeout = new TimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
@ -195,7 +203,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
} else {
out.writeBoolean(false);
}
out.writeByte(consistencyLevel.id());
activeShardCount.writeTo(out);
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
@ -60,11 +60,12 @@ public abstract class ReplicationRequestBuilder<Request extends ReplicationReque
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
@SuppressWarnings("unchecked")
public RequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);
public RequestBuilder setWaitForActiveShards(ActiveShardCount activeShardCount) {
request.waitForActiveShards(activeShardCount);
return (RequestBuilder) this;
}
}

View File

@ -23,8 +23,8 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.client.transport.NoNodeAvailableException;
@ -49,6 +49,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
@ -90,7 +91,7 @@ public abstract class TransportReplicationAction<
protected final ClusterService clusterService;
protected final IndicesService indicesService;
private final ShardStateAction shardStateAction;
private final WriteConsistencyLevel defaultWriteConsistencyLevel;
private final ActiveShardCount defaultWaitForActiveShards;
private final TransportRequestOptions transportOptions;
private final String executor;
@ -122,7 +123,7 @@ public abstract class TransportReplicationAction<
this.transportOptions = transportOptions();
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
this.defaultWaitForActiveShards = IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING.get(settings);
this.replicasProxy = new ReplicasProxy();
}
@ -165,9 +166,9 @@ public abstract class TransportReplicationAction<
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest);
/**
* True if write consistency should be checked for an implementation
* True if the active shard count should be checked before proceeding with the replication action.
*/
protected boolean checkWriteConsistency() {
protected boolean checkActiveShardCount() {
return true;
}
@ -353,7 +354,7 @@ public abstract class TransportReplicationAction<
Request request, ActionListener<PrimaryResult> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, checkWriteConsistency(), replicasProxy, clusterService::state, logger, actionName
executeOnReplicas, checkActiveShardCount(), replicasProxy, clusterService::state, logger, actionName
);
}
}
@ -566,8 +567,8 @@ public abstract class TransportReplicationAction<
}
// resolve all derived request fields, so we can route and apply it
if (request.consistencyLevel() == WriteConsistencyLevel.DEFAULT) {
request.consistencyLevel(defaultWriteConsistencyLevel);
if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
request.waitForActiveShards(defaultWaitForActiveShards);
}
resolveRequest(state.metaData(), indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";

View File

@ -131,7 +131,7 @@ public class UpdateHelper extends AbstractComponent {
.setRefreshPolicy(request.getRefreshPolicy())
.routing(request.routing())
.parent(request.parent())
.consistencyLevel(request.consistencyLevel());
.waitForActiveShards(request.waitForActiveShards());
if (request.versionType() != VersionType.INTERNAL) {
// in all but the internal versioning mode, we want to create the new document using the given version.
indexRequest.version(request.version()).versionType(request.versionType());
@ -224,14 +224,14 @@ public class UpdateHelper extends AbstractComponent {
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.version(updateVersion).versionType(request.versionType())
.consistencyLevel(request.consistencyLevel())
.waitForActiveShards(request.waitForActiveShards())
.timestamp(timestamp).ttl(ttl)
.setRefreshPolicy(request.getRefreshPolicy());
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(updateVersion).versionType(request.versionType())
.consistencyLevel(request.consistencyLevel())
.waitForActiveShards(request.waitForActiveShards())
.setRefreshPolicy(request.getRefreshPolicy());
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {

View File

@ -21,9 +21,10 @@ package org.elasticsearch.action.update;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
@ -74,7 +75,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT;
private IndexRequest upsertRequest;
@ -433,15 +434,16 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return refreshPolicy;
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
public ActiveShardCount waitForActiveShards() {
return this.activeShardCount;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public UpdateRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
public UpdateRequest waitForActiveShards(ActiveShardCount activeShardCount) {
this.activeShardCount = activeShardCount;
return this;
}
@ -703,7 +705,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
activeShardCount = ActiveShardCount.readFrom(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
@ -738,7 +740,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(consistencyLevel.id());
activeShardCount.writeTo(out);
out.writeString(type);
out.writeString(id);
out.writeOptionalString(routing);

View File

@ -19,9 +19,10 @@
package org.elasticsearch.action.update;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
@ -122,10 +123,11 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public UpdateRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);
public UpdateRequestBuilder setWaitForActiveShards(ActiveShardCount activeShardCount) {
request.waitForActiveShards(activeShardCount);
return this;
}

View File

@ -140,6 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
EngineConfig.INDEX_CODEC_SETTING,
IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING,
// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();

View File

@ -20,6 +20,7 @@ package org.elasticsearch.index;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.logging.ESLogger;
@ -128,6 +129,16 @@ public final class IndexSettings {
public static final Setting<Integer> MAX_SLICES_PER_SCROLL = Setting.intSetting("index.max_slices_per_scroll",
1024, 1, Property.Dynamic, Property.IndexScope);
/**
* The number of active shard copies required for a write operation.
*/
public static final Setting<ActiveShardCount> WAIT_FOR_ACTIVE_SHARDS_SETTING =
new Setting<>("index.write.wait_for_active_shards",
"1",
ActiveShardCount::parseString,
Setting.Property.Dynamic,
Setting.Property.IndexScope);
private final Index index;
private final Version version;
private final ESLogger logger;

View File

@ -19,11 +19,11 @@
package org.elasticsearch.rest.action.bulk;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
@ -79,9 +79,9 @@ public class RestBulkAction extends BaseRestHandler {
String defaultPipeline = request.param("pipeline");
String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null;
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));

View File

@ -19,8 +19,8 @@
package org.elasticsearch.rest.action.delete;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -55,9 +55,9 @@ public class RestDeleteAction extends BaseRestHandler {
deleteRequest.version(RestActions.parseVersion(request));
deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType()));
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
deleteRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
client.delete(deleteRequest, new RestStatusToXContentListener<>(channel));

View File

@ -19,8 +19,8 @@
package org.elasticsearch.rest.action.index;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -98,9 +98,9 @@ public class RestIndexAction extends BaseRestHandler {
}
}
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.rest.action.update;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
@ -53,9 +53,9 @@ public class RestUpdateAction extends BaseRestHandler {
updateRequest.parent(request.param("parent"));
updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout()));
updateRequest.setRefreshPolicy(request.param("refresh"));
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
updateRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
updateRequest.docAsUpsert(request.paramAsBoolean("doc_as_upsert", updateRequest.docAsUpsert()));
String sField = request.param("fields");

View File

@ -17,10 +17,9 @@
* under the License.
*/
package org.elasticsearch.consistencylevel;
package org.elasticsearch.action.support;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
@ -35,26 +34,26 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.equalTo;
/**
*
* Tests setting the active shard count for replication operations (e.g. index) operates correctly.
*/
public class WriteConsistencyLevelIT extends ESIntegTestCase {
public void testWriteConsistencyLevelReplication2() throws Exception {
public class WaitActiveShardCountIT extends ESIntegTestCase {
public void testReplicationWaitsForActiveShardCount() throws Exception {
CreateIndexResponse createIndexResponse =
prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2))
.get();
prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2)).get();
assertAcked(createIndexResponse);
// indexing, by default, will work (ONE consistency level)
client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();
// indexing, by default, will work (waiting for one shard copy only)
client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).execute().actionGet();
ActiveShardCount activeShardCount = ActiveShardCount.from(2); // wait for two active shard copies
try {
client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
.setConsistencyLevel(WriteConsistencyLevel.QUORUM)
.setWaitForActiveShards(activeShardCount)
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
fail("can't index, does not enough active shard copies");
} catch (UnavailableShardsException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + activeShardCount + "] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
// but really, all is well
}
@ -71,19 +70,19 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase {
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
// this should work, since we now have
// this should work, since we now have two
client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
.setConsistencyLevel(WriteConsistencyLevel.QUORUM)
.setWaitForActiveShards(activeShardCount)
.setTimeout(timeValueSeconds(1)).execute().actionGet();
try {
client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setWaitForActiveShards(ActiveShardCount.ALL)
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
fail("can't index, not enough active shard copies");
} catch (UnavailableShardsException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]"));
// but really, all is well
}
@ -93,9 +92,9 @@ public class WriteConsistencyLevelIT extends ESIntegTestCase {
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
// this should work, since we now have
// this should work, since we now have all shards started
client().prepareIndex("test", "type1", "1").setSource(source("1", "test"))
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setWaitForActiveShards(ActiveShardCount.ALL)
.setTimeout(timeValueSeconds(1)).execute().actionGet();
}

View File

@ -20,12 +20,14 @@ package org.elasticsearch.action.support.replication;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -33,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
@ -251,34 +254,20 @@ public class ReplicationOperationTests extends ESTestCase {
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
}
public void testWriteConsistency() throws Exception {
public void testWaitForActiveShards() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final int assignedReplicas = randomInt(2);
final int unassignedReplicas = randomInt(2);
final int totalShards = 1 + assignedReplicas + unassignedReplicas;
final boolean passesWriteConsistency;
Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values()));
switch (request.consistencyLevel()) {
case ONE:
passesWriteConsistency = true;
break;
case DEFAULT:
case QUORUM:
if (totalShards <= 2) {
passesWriteConsistency = true; // primary is enough
} else {
passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1;
}
// we have to reset default (as the transport replication action will do)
request.consistencyLevel(WriteConsistencyLevel.QUORUM);
break;
case ALL:
passesWriteConsistency = unassignedReplicas == 0;
break;
default:
throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]");
}
final IndexMetaData indexMetaData = IndexMetaData.builder(index)
.settings(Settings.builder().put("index.version.created", Version.CURRENT.id))
.numberOfReplicas(assignedReplicas + unassignedReplicas)
.numberOfShards(randomIntBetween(1, 5))
.build();
Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(randomIntBetween(0, totalShards)));
final boolean passesActiveShardCheck = request.waitForActiveShards().resolve(indexMetaData) <= assignedReplicas + 1;
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
for (int i = 0; i < assignedReplicas; i++) {
replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
@ -288,10 +277,10 @@ public class ReplicationOperationTests extends ESTestCase {
}
final ClusterState state = state(index, true, ShardRoutingState.STARTED, replicaStates);
logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]." +
logger.debug("using active shard count of [{}], assigned shards [{}], total shards [{}]." +
" expecting op to [{}]. using state: \n{}",
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas,
passesWriteConsistency ? "succeed" : "retry",
request.waitForActiveShards(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas,
passesActiveShardCheck ? "succeed" : "retry",
state.prettyPrint());
final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id());
@ -301,15 +290,15 @@ public class ReplicationOperationTests extends ESTestCase {
new TestPrimary(primaryShard, primaryTerm),
listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test");
if (passesWriteConsistency) {
assertThat(op.checkWriteConsistency(), nullValue());
if (passesActiveShardCheck) {
assertThat(op.checkActiveShardCount(), nullValue());
op.execute();
assertTrue("operations should have been performed, consistency level is met",
assertTrue("operations should have been performed, active shard count is met",
request.processedOnPrimary.get());
} else {
assertThat(op.checkWriteConsistency(), notNullValue());
assertThat(op.checkActiveShardCount(), notNullValue());
op.execute();
assertFalse("operations should not have been perform, consistency level is *NOT* met",
assertFalse("operations should not have been perform, active shard count is *NOT* met",
request.processedOnPrimary.get());
assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class);
}
@ -462,9 +451,9 @@ public class ReplicationOperationTests extends ESTestCase {
}
public TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, boolean executeOnReplicas, boolean checkWriteConsistency,
ActionListener<TestPrimary.Result> listener, boolean executeOnReplicas, boolean checkActiveShardCount,
Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
super(request, primary, listener, executeOnReplicas, checkWriteConsistency, replicas, clusterStateSupplier, logger, opType);
super(request, primary, listener, executeOnReplicas, checkActiveShardCount, replicas, clusterStateSupplier, logger, opType);
}
}

View File

@ -766,7 +766,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
@Override
protected boolean checkWriteConsistency() {
protected boolean checkActiveShardCount() {
return false;
}

View File

@ -54,11 +54,8 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase {
private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas) throws Exception {
// make sure we have enough nodes to guaranty default QUORUM consistency.
// TODO: add a smarter choice based on actual consistency (when that is randomized)
int shardsNo = numberOfReplicas + 1;
int neededNodes = shardsNo <= 2 ? 1 : shardsNo / 2 + 1;
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(neededNodes, shardsNo));
// TODO: add a smarter choice based on varying active shard count (when that is randomized)
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(1, numberOfReplicas + 1));
String id = randomAsciiOfLength(5);
// we will go the primary or the replica, but in a

View File

@ -20,7 +20,6 @@
package org.elasticsearch.search.basic;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
@ -118,7 +117,7 @@ public class TransportSearchFailuresIT extends ESIntegTestCase {
}
private void index(Client client, String id, String nameValue, int age) throws IOException {
client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age)).consistencyLevel(WriteConsistencyLevel.ONE)).actionGet();
client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age))).actionGet();
}
private XContentBuilder source(String id, String nameValue, int age) throws IOException {

View File

@ -690,7 +690,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621")
public void testChaosSnapshot() throws Exception {
final List<String> indices = new CopyOnWriteArrayList<>();
Settings settings = Settings.builder().put("action.write_consistency", "one").build();
Settings settings = Settings.builder().put(IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING.getKey(), "1").build();
int initialNodes = between(1, 3);
logger.info("--> start {} nodes", initialNodes);
for (int i = 0; i < initialNodes; i++) {

View File

@ -215,7 +215,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
return;
}
request.timeout(mainRequest.getTimeout());
request.consistencyLevel(mainRequest.getConsistency());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
if (logger.isDebugEnabled()) {
logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
new ByteSizeValue(request.estimatedSizeInBytes()));

View File

@ -21,7 +21,7 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -108,9 +108,9 @@ public abstract class AbstractBaseReindexRestHandler<
request.setRefresh(restRequest.paramAsBoolean("refresh", request.isRefresh()));
request.setTimeout(restRequest.paramAsTime("timeout", request.getTimeout()));
String consistency = restRequest.param("consistency");
if (consistency != null) {
request.setConsistency(WriteConsistencyLevel.fromString(consistency));
String waitForActiveShards = restRequest.param("wait_for_active_shards");
if (waitForActiveShards != null) {
request.setWaitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Float requestsPerSecond = parseRequestsPerSecond(restRequest);

View File

@ -21,8 +21,8 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -71,9 +71,9 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
private TimeValue timeout = ReplicationRequest.DEFAULT_TIMEOUT;
/**
* Consistency level for write requests.
* The number of shard copies that must be active before proceeding with the write.
*/
private WriteConsistencyLevel consistency = WriteConsistencyLevel.DEFAULT;
private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT;
/**
* Initial delay after a rejection before retrying a bulk request. With the default maxRetries the total backoff for retrying rejections
@ -223,17 +223,18 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
}
/**
* Consistency level for write requests.
* The number of shard copies that must be active before proceeding with the write.
*/
public WriteConsistencyLevel getConsistency() {
return consistency;
public ActiveShardCount getWaitForActiveShards() {
return activeShardCount;
}
/**
* Consistency level for write requests.
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public Self setConsistency(WriteConsistencyLevel consistency) {
this.consistency = consistency;
public Self setWaitForActiveShards(ActiveShardCount activeShardCount) {
this.activeShardCount = activeShardCount;
return self();
}
@ -317,7 +318,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
size = in.readVInt();
refresh = in.readBoolean();
timeout = new TimeValue(in);
consistency = WriteConsistencyLevel.fromId(in.readByte());
activeShardCount = ActiveShardCount.readFrom(in);
retryBackoffInitialTime = new TimeValue(in);
maxRetries = in.readVInt();
requestsPerSecond = in.readFloat();
@ -331,7 +332,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
out.writeVInt(size);
out.writeBoolean(refresh);
timeout.writeTo(out);
out.writeByte(consistency.id());
activeShardCount.writeTo(out);
retryBackoffInitialTime.writeTo(out);
out.writeVInt(maxRetries);
out.writeFloat(requestsPerSecond);

View File

@ -21,8 +21,9 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
@ -98,10 +99,11 @@ public abstract class AbstractBulkByScrollRequestBuilder<
}
/**
* Consistency level for write requests.
* The number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public Self consistency(WriteConsistencyLevel consistency) {
request.setConsistency(consistency);
public Self waitForActiveShards(ActiveShardCount activeShardCount) {
request.setWaitForActiveShards(activeShardCount);
return self();
}

View File

@ -20,10 +20,10 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -104,7 +104,7 @@ public class RoundTripTests extends ESTestCase {
request.setAbortOnVersionConflict(random().nextBoolean());
request.setRefresh(rarely());
request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test"));
request.setConsistency(randomFrom(WriteConsistencyLevel.values()));
request.setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(0, 10)));
request.setScript(random().nextBoolean() ? null : randomScript());
request.setRequestsPerSecond(between(0, Integer.MAX_VALUE));
}
@ -116,7 +116,7 @@ public class RoundTripTests extends ESTestCase {
assertEquals(request.isAbortOnVersionConflict(), tripped.isAbortOnVersionConflict());
assertEquals(request.isRefresh(), tripped.isRefresh());
assertEquals(request.getTimeout(), tripped.getTimeout());
assertEquals(request.getConsistency(), tripped.getConsistency());
assertEquals(request.getWaitForActiveShards(), tripped.getWaitForActiveShards());
assertEquals(request.getScript(), tripped.getScript());
assertEquals(request.getRetryBackoffInitialTime(), tripped.getRetryBackoffInitialTime());
assertEquals(request.getMaxRetries(), tripped.getMaxRetries());
@ -234,7 +234,7 @@ public class RoundTripTests extends ESTestCase {
assertEquals(expectedFailure.getReason().getClass(), actualFailure.getReason().getClass());
assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage());
}
}
private void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) {

View File

@ -1,5 +1,5 @@
---
"can override consistency":
"can override wait_for_active_shards":
- do:
indices.create:
index: test
@ -12,7 +12,6 @@
type: test
id: 1
body: {"text": "test"}
consistency: one
- do:
indices.refresh: {}
@ -21,12 +20,13 @@
delete_by_query:
index: test
timeout: 1s
wait_for_active_shards: 4
body:
query:
match_all: {}
- match:
failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
- do:
indices.refresh: {}
@ -40,7 +40,7 @@
- do:
delete_by_query:
index: test
consistency: one
wait_for_active_shards: 1
body:
query:
match_all: {}

View File

@ -1,5 +1,5 @@
---
"can override consistency":
"can override wait_for_active_shards":
- do:
indices.create:
index: dest
@ -12,7 +12,6 @@
type: test
id: 1
body: {"text": "test"}
consistency: one
- do:
indices.refresh: {}
@ -20,17 +19,18 @@
catch: unavailable
reindex:
timeout: 1s
wait_for_active_shards: 4
body:
source:
index: src
dest:
index: dest
- match:
failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)\..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[dest\].containing.\[1\].requests\]/
failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)\..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[dest\].containing.\[1\].requests\]/
- do:
reindex:
consistency: one
wait_for_active_shards: 1
body:
source:
index: src

View File

@ -1,5 +1,5 @@
---
"can override consistency":
"can override wait_for_active_shards":
- do:
indices.create:
index: test
@ -12,7 +12,6 @@
type: test
id: 1
body: {"text": "test"}
consistency: one
- do:
indices.refresh: {}
@ -20,14 +19,15 @@
catch: unavailable
update_by_query:
index: test
wait_for_active_shards: 4
timeout: 1s
- match:
failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
- do:
update_by_query:
index: test
consistency: one
wait_for_active_shards: 1
- match: {failures: []}
- match: {updated: 1}
- match: {version_conflicts: 0}

View File

@ -16,10 +16,9 @@
}
},
"params": {
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
"wait_for_active_shards": {
"type" : "string",
"description" : "Explicit active shard count required for the operation"
},
"refresh": {
"type" : "enum",

View File

@ -23,10 +23,9 @@
}
},
"params": {
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Specific write consistency setting for the operation"
"wait_for_active_shards": {
"type" : "string",
"description" : "Explicit active shard count required for the operation"
},
"parent": {
"type" : "string",

View File

@ -177,10 +177,9 @@
"default": "1m",
"description" : "Time each individual bulk request should wait for shards that are unavailable."
},
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
"wait_for_active_shards": {
"type" : "string",
"description" : "Explicit active shard count required for the operation"
},
"scroll_size": {
"type": "integer",

View File

@ -22,10 +22,9 @@
}
},
"params": {
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
"wait_for_active_shards": {
"type" : "string",
"description" : "Explicit active shard count required for the operation"
},
"op_type": {
"type" : "enum",

View File

@ -16,10 +16,9 @@
"default": "1m",
"description" : "Time each individual bulk request should wait for shards that are unavailable."
},
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
"wait_for_active_shards": {
"type" : "string",
"description" : "Explicit active shard count required for the operation"
},
"wait_for_completion": {
"type" : "boolean",

View File

@ -23,10 +23,9 @@
}
},
"params": {
"consistency": {
"type": "enum",
"options": ["one", "quorum", "all"],
"description": "Explicit write consistency setting for the operation"
"wait_for_active_shards": {
"type": "string",
"description": "Explicit active shard count required for the operation"
},
"fields": {
"type": "list",

View File

@ -185,10 +185,9 @@
"default": "1m",
"description" : "Time each individual bulk request should wait for shards that are unavailable."
},
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
"wait_for_active_shards": {
"type" : "string",
"description" : "Explicit active shard count required for the operation"
},
"scroll_size": {
"type": "integer",

View File

@ -45,7 +45,7 @@ public class ClientYamlSuiteRestApiParserTests extends AbstractParserTestCase {
assertThat(restApi.getPathParts().get(1), equalTo("index"));
assertThat(restApi.getPathParts().get(2), equalTo("type"));
assertThat(restApi.getParams().size(), equalTo(4));
assertThat(restApi.getParams(), contains("consistency", "op_type", "parent", "refresh"));
assertThat(restApi.getParams(), contains("wait_for_active_shards", "op_type", "parent", "refresh"));
assertThat(restApi.isBodySupported(), equalTo(true));
assertThat(restApi.isBodyRequired(), equalTo(true));
}
@ -163,10 +163,9 @@ public class ClientYamlSuiteRestApiParserTests extends AbstractParserTestCase {
" }\n" +
" } ,\n" +
" \"params\": {\n" +
" \"consistency\": {\n" +
" \"type\" : \"enum\",\n" +
" \"options\" : [\"one\", \"quorum\", \"all\"],\n" +
" \"description\" : \"Explicit write consistency setting for the operation\"\n" +
" \"wait_for_active_shards\": {\n" +
" \"type\" : \"string\",\n" +
" \"description\" : \"The number of active shard copies required to perform the operation\"\n" +
" },\n" +
" \"op_type\": {\n" +
" \"type\" : \"enum\",\n" +