Refactor TransportShardReplicationOperationAction

Refactor TransportShardReplicationOperationAction state management into clear separate Primary phase and Replication phase. The primary phase is responsible for routing the request to the node holding the primary, validating it and performing the operation on the primary. The Replication phase is responsible for sending the request to the replicas and managing their responses.

This also adds unit test infrastructure for this class, and some basic tests. We can extend later as we continue developing.

Closes #10749
This commit is contained in:
Boaz Leskes 2015-04-21 15:52:44 +02:00
parent b444d2c31a
commit 5bdfdc42d9
5 changed files with 1458 additions and 352 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
@ -100,7 +99,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncShardOperationAction(request, listener).start();
new PrimaryPhase(request, listener).run();
}
protected abstract Request newRequestInstance();
@ -112,10 +111,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract String executor();
/**
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
*/
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable;
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception;
@ -296,149 +295,163 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
protected class AsyncShardOperationAction {
/**
* Responsible for performing all operations up to the point we start starting sending requests to replica shards.
* Including forwarding the request to another node if the primary is not assigned locally.
* <p/>
* Note that as soon as we start sending request to replicas, state responsibility is transferred to {@link ReplicationPhase}
*/
final class PrimaryPhase extends AbstractRunnable {
private final ActionListener<Response> listener;
private final InternalRequest internalRequest;
private volatile ShardIterator shardIt;
private final AtomicBoolean primaryOperationStarted = new AtomicBoolean();
private volatile ClusterStateObserver observer;
private final ClusterStateObserver observer;
private final AtomicBoolean finished = new AtomicBoolean(false);
AsyncShardOperationAction(Request request, ActionListener<Response> listener) {
PrimaryPhase(Request request, ActionListener<Response> listener) {
this.internalRequest = new InternalRequest(request);
this.listener = listener;
}
public void start() {
this.observer = new ClusterStateObserver(clusterService, internalRequest.request().timeout(), logger);
doStart();
}
/**
* Returns <tt>true</tt> if the action starting to be performed on the primary (or is done).
*/
protected void doStart() throws ElasticsearchException {
try {
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return;
} else {
throw blockException;
}
}
if (resolveIndex()) {
internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));
} else {
internalRequest.concreteIndex(internalRequest.request().index());
}
@Override
public void onFailure(Throwable e) {
finishWithUnexpectedFailure(e);
}
resolveRequest(observer.observedState(), internalRequest, listener);
blockException = checkRequestBlock(observer.observedState(), internalRequest);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
return;
} else {
throw blockException;
}
}
shardIt = shards(observer.observedState(), internalRequest);
} catch (Throwable e) {
listener.onFailure(e);
protected void doRun() {
if (checkBlocks() == false) {
return;
}
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
logger.trace("no shard instances known for shard [{}], scheduling a retry", shardIt.shardId());
final ShardIterator shardIt = shards(observer.observedState(), internalRequest);
final ShardRouting primary = resolvePrimary(shardIt);
if (primary == null) {
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
return;
}
if (primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry.", primary.shardId());
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
return;
}
if (observer.observedState().nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to anode we do not know the node, scheduling a retry.", primary.shardId(), primary.currentNodeId());
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
return;
}
routeRequestOrPerformLocally(primary, shardIt);
}
boolean foundPrimary = false;
ShardRouting shardX;
while ((shardX = shardIt.nextOrNull()) != null) {
final ShardRouting shard = shardX;
// we only deal with primary shardIt here...
if (!shard.primary()) {
continue;
}
if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node.");
return;
}
if (!primaryOperationStarted.compareAndSet(false, true)) {
return;
}
foundPrimary = true;
if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
if (internalRequest.request().operationThreaded()) {
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
performOnPrimary(shard.id(), shard);
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
} else {
performOnPrimary(shard.id(), shard);
}
} catch (Throwable t) {
listener.onFailure(t);
}
/**
* checks for any cluster state blocks. Returns true if operation is OK to proceeded.
* if false is return, no further action is needed. The method takes care of any continuation, by either
* responding to the listener or scheduling a retry
*/
protected boolean checkBlocks() {
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
} else {
DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());
transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
finishAsFailed(blockException);
}
return false;
}
if (resolveIndex()) {
internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions()));
} else {
internalRequest.concreteIndex(internalRequest.request().index());
}
@Override
public Response newInstance() {
return newResponseInstance();
}
resolveRequest(observer.observedState(), internalRequest, listener);
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
blockException = checkRequestBlock(observer.observedState(), internalRequest);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(blockException);
} else {
finishAsFailed(blockException);
}
return false;
}
return true;
}
@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}
protected ShardRouting resolvePrimary(ShardIterator shardIt) {
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
ShardRouting shard;
while ((shard = shardIt.nextOrNull()) != null) {
// we only deal with primary shardIt here...
if (shard.primary()) {
return shard;
}
}
return null;
}
@Override
public void handleException(TransportException exp) {
/** send the request to the node holding the primary or execute if local */
protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
if (internalRequest.request().operationThreaded()) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
finishAsFailed(t);
}
@Override
protected void doRun() throws Exception {
performOnPrimary(primary, shardsIt);
}
});
} else {
performOnPrimary(primary, shardsIt);
}
} catch (Throwable t) {
// no commit: check threadpool rejection.
finishAsFailed(t);
}
} else {
DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId());
transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return newResponseInstance();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(Response response) {
finishOnRemoteSuccess(response);
}
@Override
public void handleException(TransportException exp) {
try {
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
retryPrimaryException(exp)) {
primaryOperationStarted.set(false);
internalRequest.request().setCanHaveDuplicates();
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(exp);
} else {
listener.onFailure(exp);
finishAsFailed(exp);
}
} catch (Throwable t) {
finishWithUnexpectedFailure(t);
}
});
}
break;
}
// we won't find a primary if there are no shards in the shard iterator, retry...
if (!foundPrimary) {
logger.trace("couldn't find a eligible primary shard, scheduling for retry.");
retryBecauseUnavailable(shardIt.shardId(), "No active shards.");
}
});
}
}
@ -446,7 +459,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
assert failure != null;
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
listener.onFailure(failure);
finishAsFailed(failure);
return;
}
// make it threaded operation so we fork on the discovery listener thread
@ -455,238 +468,102 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart();
run();
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
finishAsFailed(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
// Try one more time...
doStart();
run();
}
});
}
void performOnPrimary(int primaryShardId, final ShardRouting shard) {
ClusterState clusterState = observer.observedState();
if (raiseFailureIfHaveNotEnoughActiveShardCopies(shard, clusterState)) {
/** upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase} */
void finishAndMoveToReplication(ReplicationPhase replicationPhase) {
if (finished.compareAndSet(false, true)) {
replicationPhase.run();
} else {
assert false : "finishAndMoveToReplication called but operation is already finished";
}
}
void finishAsFailed(Throwable failure) {
if (finished.compareAndSet(false, true)) {
logger.trace("operation failed", failure);
listener.onFailure(failure);
} else {
assert false : "finishAsFailed called but operation is already finished";
}
}
void finishWithUnexpectedFailure(Throwable failure) {
logger.warn("unexpected error during the primary phase for action [{}]", failure, actionName);
if (finished.compareAndSet(false, true)) {
listener.onFailure(failure);
} else {
assert false : "finishWithUnexpectedFailure called but operation is already finished";
}
}
void finishOnRemoteSuccess(Response response) {
if (finished.compareAndSet(false, true)) {
logger.trace("operation succeeded");
listener.onResponse(response);
} else {
assert false : "finishOnRemoteSuccess called but operation is already finished";
}
}
/** perform the operation on the node holding the primary */
void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) {
final String writeConsistencyFailure = checkWriteConsistency(primary);
if (writeConsistencyFailure != null) {
retryBecauseUnavailable(primary.shardId(), writeConsistencyFailure);
return;
}
final ReplicationPhase replicationPhase;
try {
PrimaryOperationRequest por = new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request());
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(clusterState, por);
performReplicas(por, primaryResponse);
PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request());
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(observer.observedState(), por);
logger.trace("operation completed on primary [{}]", primary);
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener);
} catch (Throwable e) {
internalRequest.request.setCanHaveDuplicates();
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
primaryOperationStarted.set(false);
logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage());
retry(e);
return;
}
if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) {
if (logger.isTraceEnabled()) {
logger.trace(shard.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e);
logger.trace(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(shard.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e);
logger.debug(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e);
}
}
listener.onFailure(e);
}
}
void performReplicas(PrimaryOperationRequest por, Tuple<Response, ReplicaRequest> primaryResponse) {
ShardRouting shard;
// we double check on the state, if it got changed we need to make sure we take the latest one cause
// maybe a replica shard started its recovery process and we need to apply it there...
// we also need to make sure if the new state has a new primary shard (that we indexed to before) started
// and assigned to another node (while the indexing happened). In that case, we want to apply it on the
// new primary shard as well...
ClusterState newState = clusterService.state();
ShardRouting newPrimaryShard = null;
int numberOfUnassignedReplicas = 0;
if (observer.observedState() != newState) {
shardIt.reset();
ShardRouting originalPrimaryShard = null;
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.primary()) {
originalPrimaryShard = shard;
break;
}
}
if (originalPrimaryShard == null || !originalPrimaryShard.active()) {
throw new ElasticsearchIllegalStateException("unexpected state, failed to find primary shard on an index operation that succeeded");
}
observer.reset(newState);
shardIt = shards(newState, internalRequest);
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.primary()) {
if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId())) {
newPrimaryShard = null;
} else {
newPrimaryShard = shard;
}
}
if (!shard.primary() && shard.unassigned()) {
numberOfUnassignedReplicas++;
}
}
shardIt.reset();
internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups
} else {
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.state() != ShardRoutingState.STARTED) {
internalRequest.request().setCanHaveDuplicates();
}
if (!shard.primary() && shard.unassigned()) {
numberOfUnassignedReplicas++;
}
}
shardIt.reset();
}
int numberOfPendingShardInstances = shardIt.assignedReplicasIncludingRelocating();
if (newPrimaryShard != null) {
numberOfPendingShardInstances++;
}
ReplicationState replicationState = new ReplicationState(por, shardIt, primaryResponse.v1(), primaryResponse.v2(), listener, numberOfPendingShardInstances, numberOfUnassignedReplicas);
if (numberOfPendingShardInstances == 0) {
replicationState.forceFinish();
finishAsFailed(e);
return;
}
IndexMetaData indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex());
if (newPrimaryShard != null) {
performOnReplica(replicationState, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData);
}
shardIt.reset(); // reset the iterator
while ((shard = shardIt.nextOrNull()) != null) {
// if its unassigned, nothing to do here...
if (shard.unassigned()) {
continue;
}
// if the shard is primary and relocating, add one to the counter since we perform it on the replica as well
// (and we already did it on the primary)
boolean doOnlyOnRelocating = false;
if (shard.primary()) {
if (shard.relocating()) {
doOnlyOnRelocating = true;
} else {
continue;
}
}
// we index on a replica that is initializing as well since we might not have got the event
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
if (!doOnlyOnRelocating) {
performOnReplica(replicationState, shard, shard.currentNodeId(), indexMetaData);
}
if (shard.relocating()) {
performOnReplica(replicationState, shard, shard.relocatingNodeId(), indexMetaData);
}
}
finishAndMoveToReplication(replicationPhase);
}
void performOnReplica(final ReplicationState state, final ShardRouting shard, final String nodeId, final IndexMetaData indexMetaData) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!observer.observedState().nodes().nodeExists(nodeId)) {
state.onReplicaFailure(nodeId, null);
return;
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest());
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) {
// this delays mapping updates on replicas because they have
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
state.onReplicaSuccess();
return;
}
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
state.onReplicaSuccess();
}
@Override
public void handleException(TransportException exp) {
state.onReplicaFailure(nodeId, exp);
logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request());
if (!ignoreReplicaException(exp)) {
logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
}
});
} else {
if (internalRequest.request().operationThreaded()) {
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() {
try {
shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
} catch (Throwable e) {
state.onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
}
// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}
@Override
public void onFailure(Throwable t) {
state.onReplicaFailure(nodeId, t);
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
state.onReplicaFailure(nodeId, e);
}
} else {
try {
shardOperationOnReplica(shardRequest);
state.onReplicaSuccess();
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
state.onReplicaFailure(nodeId, e);
}
}
}
}
boolean raiseFailureIfHaveNotEnoughActiveShardCopies(ShardRouting shard, ClusterState state) {
if (!checkWriteConsistency) {
return false;
/**
* checks whether we can perform a write based on the write consistency setting
* returns **null* if OK to proceed, or a string describing the reason to stop
*/
String checkWriteConsistency(ShardRouting shard) {
if (checkWriteConsistency == false) {
return null;
}
final WriteConsistencyLevel consistencyLevel;
@ -697,11 +574,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shard.index());
IndexRoutingTable indexRoutingTable = observer.observedState().getRoutingTable().index(shard.index());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shard.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
@ -722,24 +599,21 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (sizeActive < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, sizeActive, requiredNumber);
primaryOperationStarted.set(false);
// A dedicated exception would be nice...
retryBecauseUnavailable(shard.shardId(), "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ").");
return true;
return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ").";
} else {
return false;
return null;
}
}
void retryBecauseUnavailable(ShardId shardId, String message) {
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() +"], request: " + internalRequest.request().toString()));
retry(new UnavailableShardsException(shardId, message + " Timeout: [" + internalRequest.request().timeout() + "], request: " + internalRequest.request().toString()));
}
}
private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
logger.trace("failure on replica [{}][{}]", t, index, shardId);
if (!ignoreReplicaException(t)) {
if (ignoreReplicaException(t) == false) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
@ -754,63 +628,276 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
public final class ReplicationState {
/** inner class is responsible for send the requests to all replica shards and manage the responses */
final class ReplicationPhase extends AbstractRunnable {
private final Request request;
private final ReplicaRequest replicaRequest;
private final Response finalResponse;
private final ShardId shardId;
private final ShardIterator shardIt;
private final ActionListener<Response> listener;
private final AtomicBoolean finished = new AtomicBoolean(false);
private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard
private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
private final IndexMetaData indexMetaData;
private final ShardRouting originalPrimaryShard;
private final AtomicInteger pending;
private final int numberOfShardInstances;
private final int totalShards;
private final ClusterStateObserver observer;
public ReplicationState(PrimaryOperationRequest por, ShardIterator shardsIter, Response finalResponse, ReplicaRequest replicaRequest, ActionListener<Response> listener, int numberOfPendingShardInstances, int numberOfUnassignedReplicas) {
this.request = por.request;
this.finalResponse = finalResponse;
/**
* the constructor doesn't take any action, just calculates state. Call {@link #run()} to start
* replicating.
*/
public ReplicationPhase(ShardIterator originalShardIt, ReplicaRequest replicaRequest, Response finalResponse,
ClusterStateObserver observer, ShardRouting originalPrimaryShard,
InternalRequest internalRequest, ActionListener<Response> listener) {
this.replicaRequest = replicaRequest;
this.shardId = shardsIter.shardId();
this.listener = listener;
this.numberOfShardInstances = 1 + numberOfPendingShardInstances + numberOfUnassignedReplicas;
this.finalResponse = finalResponse;
this.originalPrimaryShard = originalPrimaryShard;
this.observer = observer;
indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex());
ShardRouting shard;
// we double check on the state, if it got changed we need to make sure we take the latest one cause
// maybe a replica shard started its recovery process and we need to apply it there...
// we also need to make sure if the new state has a new primary shard (that we indexed to before) started
// and assigned to another node (while the indexing happened). In that case, we want to apply it on the
// new primary shard as well...
ClusterState newState = clusterService.state();
int numberOfUnassignedOrShadowReplicas = 0;
int numberOfPendingShardInstances = 0;
if (observer.observedState() != newState) {
observer.reset(newState);
shardIt = shards(newState, internalRequest);
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.primary()) {
if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) {
// there is a new primary, we'll have to replicate to it.
numberOfPendingShardInstances++;
}
if (shard.relocating()) {
numberOfPendingShardInstances++;
}
} else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) {
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
// this delays mapping updates on replicas because they have
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
numberOfUnassignedOrShadowReplicas++;
} else if (shard.unassigned()) {
numberOfUnassignedOrShadowReplicas++;
} else if (shard.relocating()) {
// we need to send to two copies
numberOfPendingShardInstances += 2;
} else {
numberOfPendingShardInstances++;
}
}
internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups
} else {
shardIt = originalShardIt;
shardIt.reset();
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.state() != ShardRoutingState.STARTED) {
replicaRequest.setCanHaveDuplicates();
}
if (shard.unassigned()) {
numberOfUnassignedOrShadowReplicas++;
} else if (shard.primary()) {
if (shard.relocating()) {
// we have to replicate to the other copy
numberOfPendingShardInstances += 1;
}
} else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) {
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
// this delays mapping updates on replicas because they have
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
numberOfUnassignedOrShadowReplicas++;
} else if (shard.relocating()) {
// we need to send to two copies
numberOfPendingShardInstances += 2;
} else {
numberOfPendingShardInstances++;
}
}
}
// one for the primary already done
this.totalShards = 1 + numberOfPendingShardInstances + numberOfUnassignedOrShadowReplicas;
this.pending = new AtomicInteger(numberOfPendingShardInstances);
}
public Request request() {
return this.request;
/** total shard copies */
int totalShards() {
return totalShards;
}
public ReplicaRequest replicaRequest() {
return this.replicaRequest;
/** total successful operations so far */
int successful() {
return success.get();
}
public void onReplicaFailure(String nodeId, @Nullable Throwable e) {
/** number of pending operations */
int pending() {
return pending.get();
}
@Override
public void onFailure(Throwable t) {
logger.error("unexpected error while replicating for action [{}]. shard [{}]. ", t, actionName, shardIt.shardId());
forceFinishAsFailed(t);
}
/** start sending current requests to replicas */
@Override
protected void doRun() {
if (pending.get() == 0) {
doFinish();
return;
}
ShardRouting shard;
shardIt.reset(); // reset the iterator
while ((shard = shardIt.nextOrNull()) != null) {
// if its unassigned, nothing to do here...
if (shard.unassigned()) {
continue;
}
// we index on a replica that is initializing as well since we might not have got the event
// yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it
if (shard.primary()) {
if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) {
// there is a new primary, we'll have to replicate to it.
performOnReplica(shard, shard.currentNodeId());
}
if (shard.relocating()) {
performOnReplica(shard, shard.relocatingNodeId());
}
} else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) == false) {
performOnReplica(shard, shard.currentNodeId());
if (shard.relocating()) {
performOnReplica(shard, shard.relocatingNodeId());
}
}
}
}
/** send operation to the given node or perform it if local */
void performOnReplica(final ShardRouting shard, final String nodeId) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!observer.observedState().nodes().nodeExists(nodeId)) {
onReplicaFailure(nodeId, null);
return;
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), replicaRequest);
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
onReplicaSuccess();
}
@Override
public void handleException(TransportException exp) {
onReplicaFailure(nodeId, exp);
logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest);
if (ignoreReplicaException(exp) == false) {
logger.warn("failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp);
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
"Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
}
}
});
} else {
if (replicaRequest.operationThreaded()) {
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() {
try {
shardOperationOnReplica(shardRequest);
onReplicaSuccess();
} catch (Throwable e) {
onReplicaFailure(nodeId, e);
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
}
// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}
@Override
public void onFailure(Throwable t) {
onReplicaFailure(nodeId, t);
}
});
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
} else {
try {
shardOperationOnReplica(shardRequest);
onReplicaSuccess();
} catch (Throwable e) {
failReplicaIfNeeded(shard.index(), shard.id(), e);
onReplicaFailure(nodeId, e);
}
}
}
}
void onReplicaFailure(String nodeId, @Nullable Throwable e) {
// Only version conflict should be ignored from being put into the _shards header?
if (e != null && !ignoreReplicaException(e)) {
if (e != null && ignoreReplicaException(e) == false) {
shardReplicaFailures.put(nodeId, e);
}
finishIfNeeded();
decPendingAndFinishIfNeeded();
}
public void onReplicaSuccess() {
void onReplicaSuccess() {
success.incrementAndGet();
finishIfNeeded();
decPendingAndFinishIfNeeded();
}
public void forceFinish() {
doFinish();
}
private void finishIfNeeded() {
if (pending.decrementAndGet() == 0) {
private void decPendingAndFinishIfNeeded() {
if (pending.decrementAndGet() <= 0) {
doFinish();
}
}
private void forceFinishAsFailed(Throwable t) {
if (finished.compareAndSet(false, true)) {
listener.onFailure(t);
}
}
private void doFinish() {
if (finished.compareAndSet(false, true)) {
final ShardId shardId = shardIt.shardId();
final ActionWriteResponse.ShardInfo.Failure[] failuresArray;
if (!shardReplicaFailures.isEmpty()) {
int slot = 0;
@ -824,9 +911,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} else {
failuresArray = ActionWriteResponse.EMPTY;
}
finalResponse.setShardInfo(
new ActionWriteResponse.ShardInfo(
numberOfShardInstances,
finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo(
totalShards,
success.get(),
failuresArray

View File

@ -51,4 +51,9 @@ public class DummyTransportAddress implements TransportAddress {
@Override
public void writeTo(StreamOutput out) throws IOException {
}
@Override
public String toString() {
return "_dummy_addr_";
}
}

View File

@ -0,0 +1,586 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.hamcrest.Matchers.*;
public class ShardReplicationOperationTests extends ElasticsearchTestCase {
private static ThreadPool threadPool;
private TestClusterService clusterService;
private TransportService transportService;
private CapturingTransport transport;
private Action action;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool("ShardReplicationOperationTests");
}
@Before
public void setUp() throws Exception {
super.setUp();
transport = new CapturingTransport();
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
action = new Action(ImmutableSettings.EMPTY, "testAction", transportService, clusterService, threadPool);
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
<T> void assertListenerThrows(String msg, PlainActionFuture<T> listener, Class<?> klass) throws InterruptedException {
try {
listener.get();
fail(msg);
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(klass));
}
}
@Test
public void testBlocks() throws ExecutionException, InterruptedException {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ClusterBlocks.Builder block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
assertFalse("primary phase should stop execution", primaryPhase.checkBlocks());
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
listener = new PlainActionFuture<>();
primaryPhase = action.new PrimaryPhase(new Request().timeout("5ms"), listener);
assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks());
assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
listener = new PlainActionFuture<>();
primaryPhase = action.new PrimaryPhase(new Request(), listener);
assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks());
assertFalse("primary phase should wait on retryable block", listener.isDone());
block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertListenerThrows("primary phase should fail operation when moving from a retryable block a non-retryable one", listener, ClusterBlockException.class);
}
ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) {
int assignedReplicas = randomIntBetween(0, numberOfReplicas);
return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas);
}
ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) {
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
// no point in randomizing - node assignment later on does it too.
for (int i = 0; i < assignedReplicas; i++) {
replicaStates[i] = randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
}
for (int i = assignedReplicas; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.UNASSIGNED;
}
return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates);
}
ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) {
final int numberOfReplicas = replicaStates.length;
int numberOfNodes = numberOfReplicas + 1;
if (primaryState == ShardRoutingState.RELOCATING) {
numberOfNodes++;
}
for (ShardRoutingState state : replicaStates) {
if (state == ShardRoutingState.RELOCATING) {
numberOfNodes++;
}
}
numberOfNodes = Math.max(2, numberOfNodes); // we need a non-local master to test shard failures
final ShardId shardId = new ShardId(index, 0);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Set<String> unassignedNodes = new HashSet<>();
for (int i = 0; i < numberOfNodes + 1; i++) {
final DiscoveryNode node = newNode(i);
discoBuilder = discoBuilder.put(node);
unassignedNodes.add(node.id());
}
discoBuilder.localNodeId(newNode(0).id());
discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(ImmutableSettings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).build();
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId, false);
String primaryNode = null;
String relocatingNode = null;
if (primaryState != ShardRoutingState.UNASSIGNED) {
if (primaryLocal) {
primaryNode = newNode(0).id();
unassignedNodes.remove(primaryNode);
} else {
primaryNode = selectAndRemove(unassignedNodes);
}
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
}
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, 0, primaryNode, relocatingNode, true, primaryState, 0));
for (ShardRoutingState replicaState : replicaStates) {
String replicaNode = null;
relocatingNode = null;
if (replicaState != ShardRoutingState.UNASSIGNED) {
assert primaryNode != null : "a replica is assigned but the primary isn't";
replicaNode = selectAndRemove(unassignedNodes);
if (replicaState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
}
indexShardRoutingBuilder.addShard(
new ImmutableShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState, 0));
}
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateUuidIfNeeded());
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build())));
return state.build();
}
private String selectAndRemove(Set<String> strings) {
String selection = randomFrom(strings.toArray(new String[strings.size()]));
strings.remove(selection);
return selection;
}
@Test
public void testNotStartedPrimary() throws InterruptedException, ExecutionException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
// no replicas in oder to skip the replication part
clusterService.setState(state(index, true,
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("1ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
request = new Request(shardId);
listener = new PlainActionFuture<>();
primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
assertFalse("unassigned primary didn't cause a retry", listener.isDone());
clusterService.setState(state(index, true, ShardRoutingState.STARTED));
logger.debug("--> primary assigned state:\n{}", clusterService.state().prettyPrint());
listener.get();
assertTrue("request wasn't processed on primary, despite of it being assigned", request.processedOnPrimary.get());
}
@Test
public void testRoutingToPrimary() {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
clusterService.setState(stateWithStartedPrimary(index, randomBoolean(), 3));
logger.debug("using state: \n{}", clusterService.state().prettyPrint());
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId();
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
assertTrue(primaryPhase.checkBlocks());
primaryPhase.routeRequestOrPerformLocally(shardRoutingTable.primaryShard(), shardRoutingTable.shardsIt());
if (primaryNodeId.equals(clusterService.localNode().id())) {
logger.info("--> primary is assigned locally, testing for execution");
assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get());
} else {
logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
final List<CapturingTransport.CapturedRequest> capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
assertThat(capturedRequests.get(0).action, equalTo("testAction"));
}
}
@Test
public void testWriteConsistency() {
action = new ActionWithConsistency(ImmutableSettings.EMPTY, "testActionWithConsistency", transportService, clusterService, threadPool);
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
final int assignedReplicas = randomInt(2);
final int unassignedReplicas = randomInt(2);
final int totalShards = 1 + assignedReplicas + unassignedReplicas;
final boolean passesWriteConsistency;
Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values()));
switch (request.consistencyLevel()) {
case ONE:
passesWriteConsistency = true;
break;
case DEFAULT:
case QUORUM:
if (totalShards <= 2) {
passesWriteConsistency = true; // primary is enough
} else {
passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1;
}
break;
case ALL:
passesWriteConsistency = unassignedReplicas == 0;
break;
default:
throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]");
}
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
for (int i = 0; i < assignedReplicas; i++) {
replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
}
for (int i = assignedReplicas; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.UNASSIGNED;
}
clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates));
logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]. expecting op to [{}]. using state: \n{}",
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry",
clusterService.state().prettyPrint());
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
if (passesWriteConsistency) {
assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue());
primaryPhase.run();
assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get());
} else {
assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue());
primaryPhase.run();
assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get());
for (int i = 0; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.STARTED;
}
clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates));
assertTrue("once the consistency level met, operation should continue", request.processedOnPrimary.get());
}
}
@Test
public void testReplication() throws ExecutionException, InterruptedException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
int assignedReplicas = 0;
int totalShards = 0;
for (ShardRouting shard : shardRoutingTable) {
totalShards++;
if (shard.primary() == false && shard.assignedToNode()) {
assignedReplicas++;
}
if (shard.relocating()) {
assignedReplicas++;
totalShards++;
}
}
runReplicateTest(shardRoutingTable, assignedReplicas, totalShards);
}
@Test
public void testReplicationWithShadowIndex() throws ExecutionException, InterruptedException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
ClusterState state = stateWithStartedPrimary(index, true, randomInt(5));
MetaData.Builder metaData = MetaData.builder(state.metaData());
ImmutableSettings.Builder settings = ImmutableSettings.builder().put(metaData.get(index).settings());
settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true);
metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings));
clusterService.setState(ClusterState.builder(state).metaData(metaData));
final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id());
int assignedReplicas = 0;
int totalShards = 0;
for (ShardRouting shard : shardRoutingTable) {
totalShards++;
if (shard.primary() && shard.relocating()) {
assignedReplicas++;
totalShards++;
}
}
runReplicateTest(shardRoutingTable, assignedReplicas, totalShards);
}
protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException {
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final ShardIterator shardIt = shardRoutingTable.shardsIt();
final ShardId shardId = shardIt.shardId();
final Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint());
final TransportShardReplicationOperationAction<Request, Request, Response>.InternalRequest internalRequest = action.new InternalRequest(request);
internalRequest.concreteIndex(shardId.index().name());
TransportShardReplicationOperationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(shardIt, request,
new Response(), new ClusterStateObserver(clusterService, logger),
primaryShard, internalRequest, listener);
assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
replicationPhase.run();
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
transport.clear();
assertThat(capturedRequests.length, equalTo(assignedReplicas));
if (assignedReplicas > 0) {
assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false));
}
int pending = replicationPhase.pending();
int criticalFailures = 0; // failures that should fail the shard
int successfull = 1;
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
if (randomBoolean()) {
Throwable t;
if (randomBoolean()) {
t = new CorruptIndexException("simulated", (String) null);
criticalFailures++;
} else {
t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
}
logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName());
transport.handleResponse(capturedRequest.requestId, t);
} else {
successfull++;
transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE);
}
pending--;
assertThat(replicationPhase.pending(), equalTo(pending));
assertThat(replicationPhase.successful(), equalTo(successfull));
}
assertThat(listener.isDone(), equalTo(true));
Response response = listener.get();
final ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(criticalFailures));
assertThat(shardInfo.getFailures(), arrayWithSize(criticalFailures));
assertThat(shardInfo.getSuccessful(), equalTo(successfull));
assertThat(shardInfo.getTotal(), equalTo(totalShards));
assertThat("failed to see enough shard failures", transport.capturedRequests().length, equalTo(criticalFailures));
for (CapturingTransport.CapturedRequest capturedRequest : transport.capturedRequests()) {
assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME));
}
}
static class Request extends ShardReplicationOperationRequest<Request> {
int shardId;
public AtomicBoolean processedOnPrimary = new AtomicBoolean();
public AtomicInteger processedOnReplicas = new AtomicInteger();
Request() {
this.operationThreaded(false);
}
Request(ShardId shardId) {
this();
this.shardId = shardId.id();
this.index(shardId.index().name());
// keep things simple
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(shardId);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = in.readVInt();
}
}
static class Response extends ActionWriteResponse {
}
static class Action extends TransportShardReplicationOperationAction<Request, Request, Response> {
Action(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, null, threadPool,
new ShardStateAction(settings, clusterService, transportService, null, null),
new ActionFilters(new HashSet<ActionFilter>()));
}
@Override
protected Request newRequestInstance() {
return new Request();
}
@Override
protected Request newReplicaRequestInstance() {
return new Request();
}
@Override
protected Response newResponseInstance() {
return new Response();
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected Tuple<Response, Request> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
boolean executedBefore = shardRequest.request.processedOnPrimary.getAndSet(true);
assert executedBefore == false : "request has already been executed on the primary";
return new Tuple<>(new Response(), shardRequest.request);
}
@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
shardRequest.request.processedOnReplicas.incrementAndGet();
}
@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException {
return clusterState.getRoutingTable().index(request.concreteIndex()).shard(request.request().shardId).shardsIt();
}
@Override
protected boolean checkWriteConsistency() {
return false;
}
@Override
protected boolean resolveIndex() {
return false;
}
}
static class ActionWithConsistency extends Action {
ActionWithConsistency(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, threadPool);
}
@Override
protected boolean checkWriteConsistency() {
return true;
}
}
static DiscoveryNode newNode(int nodeId) {
return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT);
}
}

View File

@ -0,0 +1,249 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.cluster;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/** a class that simulate simple cluster service features, like state storage and listeners */
public class TestClusterService implements ClusterService {
volatile ClusterState state;
private final Collection<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
private final ThreadPool threadPool;
public TestClusterService() {
this(ClusterState.builder(new ClusterName("test")).build());
}
public TestClusterService(ThreadPool threadPool) {
this(ClusterState.builder(new ClusterName("test")).build(), threadPool);
}
public TestClusterService(ClusterState state) {
this(state, null);
}
public TestClusterService(ClusterState state, @Nullable ThreadPool threadPool) {
if (state.getNodes().size() == 0) {
state = ClusterState.builder(state).nodes(
DiscoveryNodes.builder()
.put(new DiscoveryNode("test_id", DummyTransportAddress.INSTANCE, Version.CURRENT))
.localNodeId("test_id")).build();
}
assert state.getNodes().localNode() != null;
this.state = state;
this.threadPool = threadPool;
}
/** set the current state and trigger any registered listeners about the change */
public void setState(ClusterState state) {
assert state.getNodes().localNode() != null;
// make sure we have a version increment
state = ClusterState.builder(state).version(this.state.version() + 1).build();
ClusterChangedEvent event = new ClusterChangedEvent("test", state, this.state);
this.state = state;
for (ClusterStateListener listener : listeners) {
listener.clusterChanged(event);
}
}
/** set the current state and trigger any registered listeners about the change */
public void setState(ClusterState.Builder state) {
setState(state.build());
}
@Override
public DiscoveryNode localNode() {
return state.getNodes().localNode();
}
@Override
public ClusterState state() {
return state;
}
@Override
public void addInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException {
throw new UnsupportedOperationException();
}
@Override
public void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException {
throw new UnsupportedOperationException();
}
@Override
public OperationRouting operationRouting() {
return null;
}
@Override
public void addFirst(ClusterStateListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void addLast(ClusterStateListener listener) {
listeners.add(listener);
}
@Override
public void add(ClusterStateListener listener) {
listeners.add(listener);
}
@Override
public void remove(ClusterStateListener listener) {
listeners.remove(listener);
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext(); ) {
NotifyTimeout timeout = it.next();
if (timeout.listener.equals(listener)) {
timeout.cancel();
it.remove();
}
}
}
@Override
public void add(LocalNodeMasterListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void remove(LocalNodeMasterListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void add(final TimeValue timeout, final TimeoutClusterStateListener listener) {
if (threadPool == null) {
throw new UnsupportedOperationException("TestClusterService wasn't initialized with a thread pool");
}
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout);
onGoingTimeouts.add(notifyTimeout);
listeners.add(listener);
listener.postAdded();
}
@Override
public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
throw new UnsupportedOperationException();
}
@Override
public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
throw new UnsupportedOperationException();
}
@Override
public List<PendingClusterTask> pendingTasks() {
throw new UnsupportedOperationException();
}
@Override
public int numberOfPendingTasks() {
throw new UnsupportedOperationException();
}
@Override
public Lifecycle.State lifecycleState() {
throw new UnsupportedOperationException();
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
throw new UnsupportedOperationException();
}
@Override
public ClusterService start() throws ElasticsearchException {
throw new UnsupportedOperationException();
}
@Override
public ClusterService stop() throws ElasticsearchException {
throw new UnsupportedOperationException();
}
@Override
public void close() throws ElasticsearchException {
throw new UnsupportedOperationException();
}
class NotifyTimeout implements Runnable {
final TimeoutClusterStateListener listener;
final TimeValue timeout;
volatile ScheduledFuture future;
NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) {
this.listener = listener;
this.timeout = timeout;
}
public void cancel() {
FutureUtils.cancel(future);
}
@Override
public void run() {
if (future != null && future.isCancelled()) {
return;
}
listener.onTimeout(this.timeout);
// note, we rely on the listener to remove itself in case of timeout if needed
}
}
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */
public class CapturingTransport implements Transport {
private TransportServiceAdapter adapter;
static public class CapturedRequest {
final public DiscoveryNode node;
final public long requestId;
final public String action;
final public TransportRequest request;
public CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) {
this.node = node;
this.requestId = requestId;
this.action = action;
this.request = request;
}
}
private BlockingQueue<CapturedRequest> capturedRequests = ConcurrentCollections.newBlockingQueue();
/** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */
public CapturedRequest[] capturedRequests() {
return capturedRequests.toArray(new CapturedRequest[0]);
}
/**
* returns all requests captured so far, grouped by target node.
* Doesn't clear the captured request list. See {@link #clear()}
*/
public Map<String, List<CapturedRequest>> capturedRequestsByTargetNode() {
Map<String, List<CapturedRequest>> map = new HashMap<>();
for (CapturedRequest request : capturedRequests) {
List<CapturedRequest> nodeList = map.get(request.node.id());
if (nodeList == null) {
nodeList = new ArrayList<>();
map.put(request.node.id(), nodeList);
}
nodeList.add(request);
}
return map;
}
/** clears captured requests */
public void clear() {
capturedRequests.clear();
}
/** simulate a response for the given requestId */
public void handleResponse(final long requestId, final TransportResponse response) {
adapter.onResponseReceived(requestId).handleResponse(response);
}
/** simulate a remote error for the given requesTId */
public void handleResponse(final long requestId, final Throwable t) {
adapter.onResponseReceived(requestId).handleException(new RemoteTransportException("remote failure", t));
}
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
@Override
public void transportServiceAdapter(TransportServiceAdapter adapter) {
this.adapter = adapter;
}
@Override
public BoundTransportAddress boundAddress() {
return null;
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return null;
}
@Override
public TransportAddress[] addressesFromString(String address) throws Exception {
return new TransportAddress[0];
}
@Override
public boolean addressSupported(Class<? extends TransportAddress> address) {
return false;
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return true;
}
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
}
@Override
public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
}
@Override
public long serverOpen() {
return 0;
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
}
@Override
public Transport start() throws ElasticsearchException {
return null;
}
@Override
public Transport stop() throws ElasticsearchException {
return null;
}
@Override
public void close() throws ElasticsearchException {
}
}