Refactors ActiveShardCount

This commit is contained in:
Ali Beyad 2016-07-19 12:40:38 -04:00
parent 25d8eca62d
commit d93f7d6085
20 changed files with 77 additions and 168 deletions

View File

@ -69,7 +69,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
List<Object> payloads = null; List<Object> payloads = null;
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private long sizeInBytes = 0; private long sizeInBytes = 0;
@ -436,13 +436,13 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
* Sets the number of shard copies that must be active before proceeding with the write. * Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/ */
public BulkRequest waitForActiveShards(ActiveShardCount activeShardCount) { public BulkRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.activeShardCount = activeShardCount; this.waitForActiveShards = waitForActiveShards;
return this; return this;
} }
public ActiveShardCount waitForActiveShards() { public ActiveShardCount waitForActiveShards() {
return this.activeShardCount; return this.waitForActiveShards;
} }
@Override @Override
@ -527,7 +527,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
activeShardCount = ActiveShardCount.readFrom(in); waitForActiveShards = ActiveShardCount.readFrom(in);
int size = in.readVInt(); int size = in.readVInt();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
byte type = in.readByte(); byte type = in.readByte();
@ -552,7 +552,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
activeShardCount.writeTo(out); waitForActiveShards.writeTo(out);
out.writeVInt(requests.size()); out.writeVInt(requests.size());
for (ActionRequest<?> request : requests) { for (ActionRequest<?> request : requests) {
if (request instanceof IndexRequest) { if (request instanceof IndexRequest) {

View File

@ -115,8 +115,8 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
* Sets the number of shard copies that must be active before proceeding with the write. * Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/ */
public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount activeShardCount) { public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(activeShardCount); request.waitForActiveShards(waitForActiveShards);
return this; return this;
} }

View File

@ -63,8 +63,15 @@ public final class ActiveShardCount implements Writeable {
return get(value); return get(value);
} }
/**
* Validates that the instance is valid for the given number of replicas in an index.
*/
public boolean validate(final int numberOfReplicas) {
return value <= numberOfReplicas + 1;
}
private static ActiveShardCount get(final int value) { private static ActiveShardCount get(final int value) {
switch (validateValue(value)) { switch (value) {
case ACTIVE_SHARD_COUNT_DEFAULT: case ACTIVE_SHARD_COUNT_DEFAULT:
return DEFAULT; return DEFAULT;
case ALL_ACTIVE_SHARDS: case ALL_ACTIVE_SHARDS:
@ -87,29 +94,6 @@ public final class ActiveShardCount implements Writeable {
return get(in.readInt()); return get(in.readInt());
} }
private static int validateValue(final int value) {
if (value < 0 && value != ACTIVE_SHARD_COUNT_DEFAULT && value != ALL_ACTIVE_SHARDS) {
throw new IllegalArgumentException("Invalid ActiveShardCount[" + value + "]");
}
return value;
}
/**
* Resolve this instance to an actual integer value for the number of active shard counts.
* If {@link ActiveShardCount#ALL} is specified, then the given {@link IndexMetaData} is
* used to determine what the actual active shard count should be. The default value indicates
* one active shard.
*/
public int resolve(final IndexMetaData indexMetaData) {
if (this == ActiveShardCount.DEFAULT) {
return 1;
} else if (this == ActiveShardCount.ALL) {
return indexMetaData.getNumberOfReplicas() + 1;
} else {
return value;
}
}
/** /**
* Parses the active shard count from the given string. Valid values are "all" for * Parses the active shard count from the given string. Valid values are "all" for
* all shard copies, null for the default value (which defaults to one shard copy), * all shard copies, null for the default value (which defaults to one shard copy),
@ -155,7 +139,7 @@ public final class ActiveShardCount implements Writeable {
return false; return false;
} }
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) { for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
if (enoughShardsActive(shardRouting.value, indexMetaData).isEnoughShardsActive() == false) { if (enoughShardsActive(shardRouting.value) == false) {
// not enough active shard copies yet // not enough active shard copies yet
return false; return false;
} }
@ -167,10 +151,14 @@ public final class ActiveShardCount implements Writeable {
* Returns true iff the active shard count in the shard routing table is enough * Returns true iff the active shard count in the shard routing table is enough
* to meet the required shard count represented by this instance. * to meet the required shard count represented by this instance.
*/ */
public EvalResult enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) { public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable) {
final int totalActive = shardRoutingTable.activeShards().size(); if (this == ActiveShardCount.ALL) {
final int totalRequired = resolve(indexMetaData); return shardRoutingTable.allShardsStarted();
return new EvalResult(shardRoutingTable.activeShards().size() >= resolve(indexMetaData), totalActive, totalRequired); } else if (this == ActiveShardCount.DEFAULT) {
return shardRoutingTable.primaryShard().started();
} else {
return shardRoutingTable.activeShards().size() >= value;
}
} }
@Override @Override
@ -202,31 +190,4 @@ public final class ActiveShardCount implements Writeable {
} }
} }
/**
* The result of the evaluation of the active shard copy count against a shard routing table.
*/
public static final class EvalResult {
private final boolean enoughShardsActive;
private final int totalActive;
private final int totalRequired;
private EvalResult(boolean enoughShardsActive, int totalActive, int totalRequired) {
this.enoughShardsActive = enoughShardsActive;
this.totalActive = totalActive;
this.totalRequired = totalRequired;
}
public boolean isEnoughShardsActive() {
return enoughShardsActive;
}
public int getTotalActive() {
return totalActive;
}
public int getTotalRequired() {
return totalRequired;
}
}
} }

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
@ -199,6 +198,9 @@ public class ReplicationOperation<
final String indexName = shardId.getIndexName(); final String indexName = shardId.getIndexName();
final ClusterState state = clusterStateSupplier.get(); final ClusterState state = clusterStateSupplier.get();
final ActiveShardCount waitForActiveShards = request.waitForActiveShards(); final ActiveShardCount waitForActiveShards = request.waitForActiveShards();
if (waitForActiveShards == ActiveShardCount.NONE) {
return null; // not waiting for any shards
}
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexName); IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexName);
if (indexRoutingTable == null) { if (indexRoutingTable == null) {
logger.trace("[{}] index not found in the routing table", shardId); logger.trace("[{}] index not found in the routing table", shardId);
@ -209,17 +211,16 @@ public class ReplicationOperation<
logger.trace("[{}] shard not found in the routing table", shardId); logger.trace("[{}] shard not found in the routing table", shardId);
return "Shard " + shardId + " not found in the routing table"; return "Shard " + shardId + " not found in the routing table";
} }
IndexMetaData indexMetaData = state.getMetaData().index(indexName); if (waitForActiveShards.enoughShardsActive(shardRoutingTable)) {
assert indexMetaData != null;
ActiveShardCount.EvalResult result = waitForActiveShards.enoughShardsActive(shardRoutingTable, indexMetaData);
if (result.isEnoughShardsActive()) {
return null; return null;
} else { } else {
final String resolvedShards = waitForActiveShards == ActiveShardCount.ALL ? Integer.toString(shardRoutingTable.shards().size())
: waitForActiveShards.toString();
logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " + logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " +
"request [{}]", shardId, waitForActiveShards, result.getTotalActive(), result.getTotalRequired(), opType, request); "request [{}]", shardId, waitForActiveShards, shardRoutingTable.activeShards().size(),
return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " + result.getTotalActive() + resolvedShards, opType, request);
", needed " + result.getTotalRequired() + ")."; return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " +
shardRoutingTable.activeShards().size() + ", needed " + resolvedShards + ").";
} }
} }

View File

@ -63,7 +63,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
/** /**
* The number of shard copies that must be active before proceeding with the replication action. * The number of shard copies that must be active before proceeding with the replication action.
*/ */
private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
private long routedBasedOnClusterVersion = 0; private long routedBasedOnClusterVersion = 0;
@ -120,7 +120,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
} }
public ActiveShardCount waitForActiveShards() { public ActiveShardCount waitForActiveShards() {
return this.activeShardCount; return this.waitForActiveShards;
} }
/** /**
@ -141,8 +141,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
* total number of shard copies (number of replicas + 1). * total number of shard copies (number of replicas + 1).
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public final Request waitForActiveShards(ActiveShardCount activeShardCount) { public final Request waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.activeShardCount = activeShardCount; this.waitForActiveShards = waitForActiveShards;
return (Request) this; return (Request) this;
} }
@ -187,7 +187,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
} else { } else {
shardId = null; shardId = null;
} }
activeShardCount = ActiveShardCount.readFrom(in); waitForActiveShards = ActiveShardCount.readFrom(in);
timeout = new TimeValue(in); timeout = new TimeValue(in);
index = in.readString(); index = in.readString();
routedBasedOnClusterVersion = in.readVLong(); routedBasedOnClusterVersion = in.readVLong();
@ -203,7 +203,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
} else { } else {
out.writeBoolean(false); out.writeBoolean(false);
} }
activeShardCount.writeTo(out); waitForActiveShards.writeTo(out);
timeout.writeTo(out); timeout.writeTo(out);
out.writeString(index); out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion); out.writeVLong(routedBasedOnClusterVersion);

View File

@ -64,8 +64,8 @@ public abstract class ReplicationRequestBuilder<Request extends ReplicationReque
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public RequestBuilder setWaitForActiveShards(ActiveShardCount activeShardCount) { public RequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(activeShardCount); request.waitForActiveShards(waitForActiveShards);
return (RequestBuilder) this; return (RequestBuilder) this;
} }
} }

View File

@ -75,7 +75,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
private ActiveShardCount activeShardCount = ActiveShardCount.DEFAULT; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
private IndexRequest upsertRequest; private IndexRequest upsertRequest;
@ -435,15 +435,15 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
} }
public ActiveShardCount waitForActiveShards() { public ActiveShardCount waitForActiveShards() {
return this.activeShardCount; return this.waitForActiveShards;
} }
/** /**
* Sets the number of shard copies that must be active before proceeding with the write. * Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/ */
public UpdateRequest waitForActiveShards(ActiveShardCount activeShardCount) { public UpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.activeShardCount = activeShardCount; this.waitForActiveShards = waitForActiveShards;
return this; return this;
} }
@ -705,7 +705,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
activeShardCount = ActiveShardCount.readFrom(in); waitForActiveShards = ActiveShardCount.readFrom(in);
type = in.readString(); type = in.readString();
id = in.readString(); id = in.readString();
routing = in.readOptionalString(); routing = in.readOptionalString();
@ -740,7 +740,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
activeShardCount.writeTo(out); waitForActiveShards.writeTo(out);
out.writeString(type); out.writeString(type);
out.writeString(id); out.writeString(id);
out.writeOptionalString(routing); out.writeOptionalString(routing);

View File

@ -126,8 +126,8 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
* Sets the number of shard copies that must be active before proceeding with the write. * Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/ */
public UpdateRequestBuilder setWaitForActiveShards(ActiveShardCount activeShardCount) { public UpdateRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(activeShardCount); request.waitForActiveShards(waitForActiveShards);
return this; return this;
} }

View File

@ -347,7 +347,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
.setRoutingNumShards(routingNumShards); .setRoutingNumShards(routingNumShards);
// Set up everything, now locally create the index to see that things are ok, and apply // Set up everything, now locally create the index to see that things are ok, and apply
final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build();
if (request.waitForActiveShards().resolve(tmpImd) > tmpImd.getNumberOfReplicas() + 1) { if (request.waitForActiveShards().validate(tmpImd.getNumberOfReplicas()) == false) {
throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() + throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() +
"]: cannot be greater than number of shard copies [" + "]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]"); (tmpImd.getNumberOfReplicas() + 1) + "]");

View File

@ -37,8 +37,6 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import static org.hamcrest.Matchers.equalTo;
/** /**
* Tests for the {@link ActiveShardCount} class * Tests for the {@link ActiveShardCount} class
*/ */
@ -47,41 +45,10 @@ public class ActiveShardCountTests extends ESTestCase {
public void testFromIntValue() { public void testFromIntValue() {
assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE); assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE);
final int value = randomIntBetween(1, 50); final int value = randomIntBetween(1, 50);
IndexMetaData indexMetaData = IndexMetaData.builder("test") assertEquals(ActiveShardCount.from(value).toString(), Integer.toString(value));
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
assertEquals(ActiveShardCount.from(value).resolve(indexMetaData), value);
expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1))); expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1)));
} }
public void testResolve() {
// one shard
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(1));
assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1));
assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0));
final int value = randomIntBetween(2, 20);
assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value));
// more than one shard
final int numNewShards = randomIntBetween(1, 20);
indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(numNewShards)
.build();
assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(numNewShards + 1));
assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1));
assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0));
assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value));
}
public void testSerialization() throws IOException { public void testSerialization() throws IOException {
doWriteRead(ActiveShardCount.ALL); doWriteRead(ActiveShardCount.ALL);
doWriteRead(ActiveShardCount.DEFAULT); doWriteRead(ActiveShardCount.DEFAULT);
@ -119,15 +86,11 @@ public class ActiveShardCountTests extends ESTestCase {
final String indexName = "test-idx"; final String indexName = "test-idx";
final int numberOfShards = randomIntBetween(1, 5); final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(4, 7); final int numberOfReplicas = randomIntBetween(4, 7);
final ActiveShardCount waitForActiveShards = ActiveShardCount.from(0); final ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName); clusterState = startPrimaries(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName); clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
} }
@ -145,14 +108,15 @@ public class ActiveShardCountTests extends ESTestCase {
final String indexName = "test-idx"; final String indexName = "test-idx";
final int numberOfShards = randomIntBetween(1, 5); final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(4, 7); final int numberOfReplicas = randomIntBetween(4, 7);
final ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(2, numberOfReplicas)); final int activeShardCount = randomIntBetween(2, numberOfReplicas);
final ActiveShardCount waitForActiveShards = ActiveShardCount.from(activeShardCount);
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName); clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); clusterState = startLessThanWaitOnShards(clusterState, indexName, activeShardCount - 2);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); clusterState = startWaitOnShards(clusterState, indexName, activeShardCount - 1);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName); clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
@ -168,7 +132,7 @@ public class ActiveShardCountTests extends ESTestCase {
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName); clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); clusterState = startLessThanWaitOnShards(clusterState, indexName, numberOfReplicas - randomIntBetween(1, numberOfReplicas));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName); clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
@ -184,10 +148,6 @@ public class ActiveShardCountTests extends ESTestCase {
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startPrimaries(clusterState, indexName); clusterState = startPrimaries(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
clusterState = startAllShards(clusterState, indexName); clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
} }
@ -223,16 +183,15 @@ public class ActiveShardCountTests extends ESTestCase {
return ClusterState.builder(clusterState).routingTable(routingTable).build(); return ClusterState.builder(clusterState).routingTable(routingTable).build();
} }
private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, final int numShardsToStart) {
final ActiveShardCount waitForActiveShards) {
RoutingTable routingTable = clusterState.routingTable(); RoutingTable routingTable = clusterState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(indexName); IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) { for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
final IndexShardRoutingTable shardRoutingTable = shardEntry.value; final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
assert shardRoutingTable.getSize() > 2; assert shardRoutingTable.getSize() > 2;
int numToStart = numShardsToStart;
// want less than half, and primary is already started // want less than half, and primary is already started
int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 2;
for (ShardRouting shardRouting : shardRoutingTable.getShards()) { for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
if (shardRouting.primary()) { if (shardRouting.primary()) {
assertTrue(shardRouting.active()); assertTrue(shardRouting.active());
@ -250,15 +209,14 @@ public class ActiveShardCountTests extends ESTestCase {
return ClusterState.builder(clusterState).routingTable(routingTable).build(); return ClusterState.builder(clusterState).routingTable(routingTable).build();
} }
private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, final int numShardsToStart) {
final ActiveShardCount waitForActiveShards) {
RoutingTable routingTable = clusterState.routingTable(); RoutingTable routingTable = clusterState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(indexName); IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex());
for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) { for (final ObjectCursor<IndexShardRoutingTable> shardEntry : indexRoutingTable.getShards().values()) {
final IndexShardRoutingTable shardRoutingTable = shardEntry.value; final IndexShardRoutingTable shardRoutingTable = shardEntry.value;
assert shardRoutingTable.getSize() > 2; assert shardRoutingTable.getSize() > 2;
int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 1; // primary is already started int numToStart = numShardsToStart;
for (ShardRouting shardRouting : shardRoutingTable.getShards()) { for (ShardRouting shardRouting : shardRoutingTable.getShards()) {
if (shardRouting.primary()) { if (shardRouting.primary()) {
assertTrue(shardRouting.active()); assertTrue(shardRouting.active());

View File

@ -20,14 +20,12 @@ package org.elasticsearch.action.support.replication;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
@ -35,7 +33,6 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShardNotStartedException; import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
@ -260,13 +257,9 @@ public class ReplicationOperationTests extends ESTestCase {
final int assignedReplicas = randomInt(2); final int assignedReplicas = randomInt(2);
final int unassignedReplicas = randomInt(2); final int unassignedReplicas = randomInt(2);
final int totalShards = 1 + assignedReplicas + unassignedReplicas; final int totalShards = 1 + assignedReplicas + unassignedReplicas;
final IndexMetaData indexMetaData = IndexMetaData.builder(index) final int activeShardCount = randomIntBetween(0, totalShards);
.settings(Settings.builder().put("index.version.created", Version.CURRENT.id)) Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(activeShardCount));
.numberOfReplicas(assignedReplicas + unassignedReplicas) final boolean passesActiveShardCheck = activeShardCount <= assignedReplicas + 1;
.numberOfShards(randomIntBetween(1, 5))
.build();
Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(randomIntBetween(0, totalShards)));
final boolean passesActiveShardCheck = request.waitForActiveShards().resolve(indexMetaData) <= assignedReplicas + 1;
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
for (int i = 0; i < assignedReplicas; i++) { for (int i = 0; i < assignedReplicas; i++) {

View File

@ -54,9 +54,8 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase {
private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas) throws Exception { private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas) throws Exception {
// TODO: add a smarter choice based on varying active shard count (when that is randomized) // TODO: randomize the wait for active shards value on index creation and ensure the appropriate
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(1, numberOfReplicas + 1)); // number of data nodes are started for the randomized active shard count value
String id = randomAsciiOfLength(5); String id = randomAsciiOfLength(5);
// we will go the primary or the replica, but in a // we will go the primary or the replica, but in a
// randomized re-creatable manner // randomized re-creatable manner

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -47,7 +46,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -690,11 +688,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621") @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621")
public void testChaosSnapshot() throws Exception { public void testChaosSnapshot() throws Exception {
final List<String> indices = new CopyOnWriteArrayList<>(); final List<String> indices = new CopyOnWriteArrayList<>();
Settings settings = Settings.builder().put(IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING.getKey(), "1").build();
int initialNodes = between(1, 3); int initialNodes = between(1, 3);
logger.info("--> start {} nodes", initialNodes); logger.info("--> start {} nodes", initialNodes);
for (int i = 0; i < initialNodes; i++) { for (int i = 0; i < initialNodes; i++) {
internalCluster().startNode(settings); internalCluster().startNode();
} }
logger.info("--> creating repository"); logger.info("--> creating repository");
@ -713,7 +710,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
int asyncNodes = between(0, 5); int asyncNodes = between(0, 5);
logger.info("--> start {} additional nodes asynchronously", asyncNodes); logger.info("--> start {} additional nodes asynchronously", asyncNodes);
InternalTestCluster.Async<List<String>> asyncNodesFuture = internalCluster().startNodesAsync(asyncNodes, settings); InternalTestCluster.Async<List<String>> asyncNodesFuture = internalCluster().startNodesAsync(asyncNodes);
int asyncIndices = between(0, 10); int asyncIndices = between(0, 10);
logger.info("--> create {} additional indices asynchronously", asyncIndices); logger.info("--> create {} additional indices asynchronously", asyncIndices);

View File

@ -18,7 +18,7 @@
"params": { "params": {
"wait_for_active_shards": { "wait_for_active_shards": {
"type" : "string", "type" : "string",
"description" : "Explicit active shard count required for the operation" "description" : "Sets the number of shard copies that must be active before proceeding with the bulk operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
}, },
"refresh": { "refresh": {
"type" : "enum", "type" : "enum",

View File

@ -25,7 +25,7 @@
"params": { "params": {
"wait_for_active_shards": { "wait_for_active_shards": {
"type" : "string", "type" : "string",
"description" : "Explicit active shard count required for the operation" "description" : "Sets the number of shard copies that must be active before proceeding with the delete operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
}, },
"parent": { "parent": {
"type" : "string", "type" : "string",

View File

@ -179,7 +179,7 @@
}, },
"wait_for_active_shards": { "wait_for_active_shards": {
"type" : "string", "type" : "string",
"description" : "Explicit active shard count required for the operation" "description" : "Sets the number of shard copies that must be active before proceeding with the delete by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
}, },
"scroll_size": { "scroll_size": {
"type": "integer", "type": "integer",

View File

@ -24,7 +24,7 @@
"params": { "params": {
"wait_for_active_shards": { "wait_for_active_shards": {
"type" : "string", "type" : "string",
"description" : "Explicit active shard count required for the operation" "description" : "Sets the number of shard copies that must be active before proceeding with the index operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
}, },
"op_type": { "op_type": {
"type" : "enum", "type" : "enum",

View File

@ -18,7 +18,7 @@
}, },
"wait_for_active_shards": { "wait_for_active_shards": {
"type" : "string", "type" : "string",
"description" : "Explicit active shard count required for the operation" "description" : "Sets the number of shard copies that must be active before proceeding with the reindex operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
}, },
"wait_for_completion": { "wait_for_completion": {
"type" : "boolean", "type" : "boolean",

View File

@ -25,7 +25,7 @@
"params": { "params": {
"wait_for_active_shards": { "wait_for_active_shards": {
"type": "string", "type": "string",
"description": "Explicit active shard count required for the operation" "description": "Sets the number of shard copies that must be active before proceeding with the update operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
}, },
"fields": { "fields": {
"type": "list", "type": "list",

View File

@ -187,7 +187,7 @@
}, },
"wait_for_active_shards": { "wait_for_active_shards": {
"type" : "string", "type" : "string",
"description" : "Explicit active shard count required for the operation" "description" : "Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)"
}, },
"scroll_size": { "scroll_size": {
"type": "integer", "type": "integer",