Core: Perform write consistency just before writing on the primary shard

Before this change the write consistency change was performed on the node that receives the write request and the node that holds the primary shard. This change removes the check on the node that receives the request, since it is redundant.

Also this change moves the write consistency check on the node that holds the primary shard to a later moment after forking of the thread to perform the actual write on the primary shard.

Closes #7873
This commit is contained in:
Martijn van Groningen 2014-09-24 15:57:03 +02:00
parent 99187e5259
commit e1a8b027d7
2 changed files with 86 additions and 67 deletions

View File

@ -19,10 +19,7 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.*;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
@ -36,10 +33,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -351,14 +345,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
/**
* Returns <tt>true</tt> if the action starting to be performed on the primary (or is done).
*/
protected boolean doStart() throws ElasticsearchException {
protected void doStart() throws ElasticsearchException {
try {
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return false;
return;
} else {
throw blockException;
}
@ -370,14 +364,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
// check if we need to execute, and if not, return
if (!resolveRequest(observer.observedState(), internalRequest, listener)) {
return true;
return;
}
blockException = checkRequestBlock(observer.observedState(), internalRequest);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return false;
return;
} else {
throw blockException;
}
@ -385,15 +379,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
shardIt = shards(observer.observedState(), internalRequest);
} catch (Throwable e) {
listener.onFailure(e);
return true;
return;
}
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
logger.trace("no shard instances known for shard [{}], scheduling a retry", shardIt.shardId());
retry(null);
return false;
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
return;
}
boolean foundPrimary = false;
@ -406,34 +399,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
retry(null);
return false;
}
// check here for consistency
if (checkWriteConsistency) {
WriteConsistencyLevel consistencyLevel = defaultWriteConsistencyLevel;
if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) {
consistencyLevel = internalRequest.request().consistencyLevel();
}
int requiredNumber = 1;
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardIt.size() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardIt.size() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardIt.size();
}
if (shardIt.sizeActive() < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, shardIt.sizeActive(), requiredNumber);
retry(null);
return false;
}
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned is a known node.");
return;
}
if (!primaryOperationStarted.compareAndSet(false, true)) {
return true;
return;
}
foundPrimary = true;
@ -445,14 +416,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void run() {
try {
performOnPrimary(shard.id(), shard, observer.observedState());
performOnPrimary(shard.id(), shard);
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
} else {
performOnPrimary(shard.id(), shard, observer.observedState());
performOnPrimary(shard.id(), shard);
}
} catch (Throwable t) {
listener.onFailure(t);
@ -485,7 +456,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(null);
retry(exp);
} else {
listener.onFailure(exp);
}
@ -497,15 +468,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// we won't find a primary if there are no shards in the shard iterator, retry...
if (!foundPrimary) {
logger.trace("couldn't find a eligible primary shard, scheduling for retry.");
retry(null);
return false;
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
}
return true;
}
void retry(@Nullable final Throwable failure) {
void retry(Throwable failure) {
assert failure != null;
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
listener.onFailure(failure);
return;
}
// make it threaded operation so we fork on the discovery listener thread
@ -525,26 +496,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void onTimeout(TimeValue timeout) {
if (doStart()) {
return;
}
raiseTimeoutFailure(timeout, failure);
// Try one more time...
doStart();
}
});
}
void raiseTimeoutFailure(TimeValue timeout, @Nullable Throwable failure) {
if (failure == null) {
if (shardIt == null) {
failure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
} else {
failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + internalRequest.request().toString());
}
void performOnPrimary(int primaryShardId, final ShardRouting shard) {
ClusterState clusterState = observer.observedState();
if (raiseFailureIfHaveNotEnoughActiveShardCopies(shard, clusterState)) {
return;
}
listener.onFailure(failure);
}
void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) {
try {
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
performReplicas(response);
@ -613,7 +575,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
shardIt.reset();
internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups
} else{
} else {
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.state() != ShardRoutingState.STARTED) {
@ -751,7 +713,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
@Override
public void onFailure(Throwable t) {}
public void onFailure(Throwable t) {
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
@ -774,6 +737,57 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
boolean raiseFailureIfHaveNotEnoughActiveShardCopies(ShardRouting shard, ClusterState state) {
if (!checkWriteConsistency) {
return false;
}
final WriteConsistencyLevel consistencyLevel;
if (internalRequest.request().consistencyLevel() != WriteConsistencyLevel.DEFAULT) {
consistencyLevel = internalRequest.request().consistencyLevel();
} else {
consistencyLevel = defaultWriteConsistencyLevel;
}
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardRoutingTable.getSize();
} else {
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}
if (sizeActive < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, sizeActive, requiredNumber);
primaryOperationStarted.set(false);
// A dedicated exception would be nice...
retryBecauseUnavailable(shard.shardId(), "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ").");
return true;
} else {
return false;
}
}
void retryBecauseUnavailable(ShardId shardId, String message) {
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() +"], request: " + internalRequest.request().toString()));
}
}
private void failReplicaIfNeeded(String index, int shardId, Throwable t) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -55,7 +56,9 @@ public class WriteConsistencyLevelTests extends ElasticsearchIntegrationTest {
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
} catch (UnavailableShardsException e) {
// all is well
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
// but really, all is well
}
allowNodes("test", 2);
@ -76,7 +79,9 @@ public class WriteConsistencyLevelTests extends ElasticsearchIntegrationTest {
.setTimeout(timeValueMillis(100)).execute().actionGet();
fail("can't index, does not match consistency");
} catch (UnavailableShardsException e) {
// all is well
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: index {[test][type1][1], source[{ type1 : { \"id\" : \"1\", \"name\" : \"test\" } }]}"));
// but really, all is well
}
allowNodes("test", 3);