diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 9c7b65bc939..529586927fd 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.*; @@ -100,7 +99,7 @@ public abstract class TransportShardReplicationOperationAction listener) { - new AsyncShardOperationAction(request, listener).start(); + new PrimaryPhase(request, listener).run(); } protected abstract Request newRequestInstance(); @@ -112,10 +111,10 @@ public abstract class TransportShardReplicationOperationAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable; + protected abstract Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable; protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest) throws Exception; @@ -296,149 +295,163 @@ public abstract class TransportShardReplicationOperationAction + * Note that as soon as we start sending request to replicas, state responsibility is transferred to {@link ReplicationPhase} + */ + final class PrimaryPhase extends AbstractRunnable { private final ActionListener listener; private final InternalRequest internalRequest; - private volatile ShardIterator shardIt; - private final AtomicBoolean primaryOperationStarted = new AtomicBoolean(); - private volatile ClusterStateObserver observer; + private final ClusterStateObserver observer; + private final AtomicBoolean finished = new AtomicBoolean(false); - AsyncShardOperationAction(Request request, ActionListener listener) { + + PrimaryPhase(Request request, ActionListener listener) { this.internalRequest = new InternalRequest(request); this.listener = listener; - } - - public void start() { this.observer = new ClusterStateObserver(clusterService, internalRequest.request().timeout(), logger); - doStart(); } - /** - * Returns true if the action starting to be performed on the primary (or is done). - */ - protected void doStart() throws ElasticsearchException { - try { - ClusterBlockException blockException = checkGlobalBlock(observer.observedState()); - if (blockException != null) { - if (blockException.retryable()) { - logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); - retry(blockException); - return; - } else { - throw blockException; - } - } - if (resolveIndex()) { - internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions())); - } else { - internalRequest.concreteIndex(internalRequest.request().index()); - } + @Override + public void onFailure(Throwable e) { + finishWithUnexpectedFailure(e); + } - resolveRequest(observer.observedState(), internalRequest, listener); - - blockException = checkRequestBlock(observer.observedState(), internalRequest); - if (blockException != null) { - if (blockException.retryable()) { - logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); - retry(blockException); - return; - } else { - throw blockException; - } - } - shardIt = shards(observer.observedState(), internalRequest); - } catch (Throwable e) { - listener.onFailure(e); + protected void doRun() { + if (checkBlocks() == false) { return; } - - // no shardIt, might be in the case between index gateway recovery and shardIt initialization - if (shardIt.size() == 0) { - logger.trace("no shard instances known for shard [{}], scheduling a retry", shardIt.shardId()); + final ShardIterator shardIt = shards(observer.observedState(), internalRequest); + final ShardRouting primary = resolvePrimary(shardIt); + if (primary == null) { retryBecauseUnavailable(shardIt.shardId(), "No active shards."); return; } + if (primary.active() == false) { + logger.trace("primary shard [{}] is not yet active, scheduling a retry.", primary.shardId()); + retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); + return; + } + if (observer.observedState().nodes().nodeExists(primary.currentNodeId()) == false) { + logger.trace("primary shard [{}] is assigned to anode we do not know the node, scheduling a retry.", primary.shardId(), primary.currentNodeId()); + retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); + return; + } + routeRequestOrPerformLocally(primary, shardIt); + } - boolean foundPrimary = false; - ShardRouting shardX; - while ((shardX = shardIt.nextOrNull()) != null) { - final ShardRouting shard = shardX; - // we only deal with primary shardIt here... - if (!shard.primary()) { - continue; - } - if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) { - logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId()); - retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); - return; - } - - if (!primaryOperationStarted.compareAndSet(false, true)) { - return; - } - - foundPrimary = true; - if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { - try { - if (internalRequest.request().operationThreaded()) { - threadPool.executor(executor).execute(new Runnable() { - @Override - public void run() { - try { - performOnPrimary(shard.id(), shard); - } catch (Throwable t) { - listener.onFailure(t); - } - } - }); - } else { - performOnPrimary(shard.id(), shard); - } - } catch (Throwable t) { - listener.onFailure(t); - } + /** + * checks for any cluster state blocks. Returns true if operation is OK to proceeded. + * if false is return, no further action is needed. The method takes care of any continuation, by either + * responding to the listener or scheduling a retry + */ + protected boolean checkBlocks() { + ClusterBlockException blockException = checkGlobalBlock(observer.observedState()); + if (blockException != null) { + if (blockException.retryable()) { + logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); + retry(blockException); } else { - DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId()); - transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler() { + finishAsFailed(blockException); + } + return false; + } + if (resolveIndex()) { + internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions())); + } else { + internalRequest.concreteIndex(internalRequest.request().index()); + } - @Override - public Response newInstance() { - return newResponseInstance(); - } + resolveRequest(observer.observedState(), internalRequest, listener); - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + blockException = checkRequestBlock(observer.observedState(), internalRequest); + if (blockException != null) { + if (blockException.retryable()) { + logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); + retry(blockException); + } else { + finishAsFailed(blockException); + } + return false; + } + return true; + } - @Override - public void handleResponse(Response response) { - listener.onResponse(response); - } + protected ShardRouting resolvePrimary(ShardIterator shardIt) { + // no shardIt, might be in the case between index gateway recovery and shardIt initialization + ShardRouting shard; + while ((shard = shardIt.nextOrNull()) != null) { + // we only deal with primary shardIt here... + if (shard.primary()) { + return shard; + } + } + return null; + } - @Override - public void handleException(TransportException exp) { + /** send the request to the node holding the primary or execute if local */ + protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) { + if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { + try { + if (internalRequest.request().operationThreaded()) { + threadPool.executor(executor).execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + finishAsFailed(t); + } + + @Override + protected void doRun() throws Exception { + performOnPrimary(primary, shardsIt); + } + }); + } else { + performOnPrimary(primary, shardsIt); + } + } catch (Throwable t) { + // no commit: check threadpool rejection. + finishAsFailed(t); + } + } else { + DiscoveryNode node = observer.observedState().nodes().get(primary.currentNodeId()); + transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler() { + + @Override + public Response newInstance() { + return newResponseInstance(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(Response response) { + finishOnRemoteSuccess(response); + } + + @Override + public void handleException(TransportException exp) { + try { // if we got disconnected from the node, or the node / shard is not in the right state (being closed) if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || retryPrimaryException(exp)) { - primaryOperationStarted.set(false); internalRequest.request().setCanHaveDuplicates(); // we already marked it as started when we executed it (removed the listener) so pass false // to re-add to the cluster listener logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); retry(exp); } else { - listener.onFailure(exp); + finishAsFailed(exp); } + } catch (Throwable t) { + finishWithUnexpectedFailure(t); } - }); - } - break; - } - // we won't find a primary if there are no shards in the shard iterator, retry... - if (!foundPrimary) { - logger.trace("couldn't find a eligible primary shard, scheduling for retry."); - retryBecauseUnavailable(shardIt.shardId(), "No active shards."); + } + }); } } @@ -446,7 +459,7 @@ public abstract class TransportShardReplicationOperationAction primaryResponse = shardOperationOnPrimary(clusterState, por); - performReplicas(por, primaryResponse); + PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request()); + Tuple primaryResponse = shardOperationOnPrimary(observer.observedState(), por); + logger.trace("operation completed on primary [{}]", primary); + replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener); } catch (Throwable e) { internalRequest.request.setCanHaveDuplicates(); // shard has not been allocated yet, retry it here if (retryPrimaryException(e)) { - primaryOperationStarted.set(false); logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage()); retry(e); return; } if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { if (logger.isTraceEnabled()) { - logger.trace(shard.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); + logger.trace(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); } } else { if (logger.isDebugEnabled()) { - logger.debug(shard.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); + logger.debug(primary.shortSummary() + ": Failed to execute [" + internalRequest.request() + "]", e); } } - listener.onFailure(e); - } - } - - void performReplicas(PrimaryOperationRequest por, Tuple primaryResponse) { - ShardRouting shard; - // we double check on the state, if it got changed we need to make sure we take the latest one cause - // maybe a replica shard started its recovery process and we need to apply it there... - - // we also need to make sure if the new state has a new primary shard (that we indexed to before) started - // and assigned to another node (while the indexing happened). In that case, we want to apply it on the - // new primary shard as well... - ClusterState newState = clusterService.state(); - ShardRouting newPrimaryShard = null; - int numberOfUnassignedReplicas = 0; - if (observer.observedState() != newState) { - shardIt.reset(); - ShardRouting originalPrimaryShard = null; - while ((shard = shardIt.nextOrNull()) != null) { - if (shard.primary()) { - originalPrimaryShard = shard; - break; - } - } - if (originalPrimaryShard == null || !originalPrimaryShard.active()) { - throw new ElasticsearchIllegalStateException("unexpected state, failed to find primary shard on an index operation that succeeded"); - } - - observer.reset(newState); - shardIt = shards(newState, internalRequest); - while ((shard = shardIt.nextOrNull()) != null) { - if (shard.primary()) { - if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId())) { - newPrimaryShard = null; - } else { - newPrimaryShard = shard; - } - } - - if (!shard.primary() && shard.unassigned()) { - numberOfUnassignedReplicas++; - } - } - shardIt.reset(); - internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups - } else { - shardIt.reset(); - while ((shard = shardIt.nextOrNull()) != null) { - if (shard.state() != ShardRoutingState.STARTED) { - internalRequest.request().setCanHaveDuplicates(); - } - if (!shard.primary() && shard.unassigned()) { - numberOfUnassignedReplicas++; - } - } - shardIt.reset(); - } - - int numberOfPendingShardInstances = shardIt.assignedReplicasIncludingRelocating(); - if (newPrimaryShard != null) { - numberOfPendingShardInstances++; - } - ReplicationState replicationState = new ReplicationState(por, shardIt, primaryResponse.v1(), primaryResponse.v2(), listener, numberOfPendingShardInstances, numberOfUnassignedReplicas); - if (numberOfPendingShardInstances == 0) { - replicationState.forceFinish(); + finishAsFailed(e); return; } - IndexMetaData indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex()); - if (newPrimaryShard != null) { - performOnReplica(replicationState, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData); - } - - shardIt.reset(); // reset the iterator - while ((shard = shardIt.nextOrNull()) != null) { - // if its unassigned, nothing to do here... - if (shard.unassigned()) { - continue; - } - - // if the shard is primary and relocating, add one to the counter since we perform it on the replica as well - // (and we already did it on the primary) - boolean doOnlyOnRelocating = false; - if (shard.primary()) { - if (shard.relocating()) { - doOnlyOnRelocating = true; - } else { - continue; - } - } - // we index on a replica that is initializing as well since we might not have got the event - // yet that it was started. We will get an exception IllegalShardState exception if its not started - // and that's fine, we will ignore it - if (!doOnlyOnRelocating) { - performOnReplica(replicationState, shard, shard.currentNodeId(), indexMetaData); - } - if (shard.relocating()) { - performOnReplica(replicationState, shard, shard.relocatingNodeId(), indexMetaData); - } - } + finishAndMoveToReplication(replicationPhase); } - void performOnReplica(final ReplicationState state, final ShardRouting shard, final String nodeId, final IndexMetaData indexMetaData) { - // if we don't have that node, it means that it might have failed and will be created again, in - // this case, we don't have to do the operation, and just let it failover - if (!observer.observedState().nodes().nodeExists(nodeId)) { - state.onReplicaFailure(nodeId, null); - return; - } - - final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest()); - - // If the replicas use shadow replicas, there is no reason to - // perform the action on the replica, so skip it and - // immediately return - if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) { - // this delays mapping updates on replicas because they have - // to wait until they get the new mapping through the cluster - // state, which is why we recommend pre-defined mappings for - // indices using shadow replicas - state.onReplicaSuccess(); - return; - } - - if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { - final DiscoveryNode node = observer.observedState().nodes().get(nodeId); - transportService.sendRequest(node, transportReplicaAction, shardRequest, - transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty vResponse) { - state.onReplicaSuccess(); - } - - @Override - public void handleException(TransportException exp) { - state.onReplicaFailure(nodeId, exp); - logger.trace("[{}] Transport failure during replica request [{}] ", exp, node, internalRequest.request()); - if (!ignoreReplicaException(exp)) { - logger.warn("Failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp); - shardStateAction.shardFailed(shard, indexMetaData.getUUID(), - "Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]"); - } - } - - }); - } else { - if (internalRequest.request().operationThreaded()) { - try { - threadPool.executor(executor).execute(new AbstractRunnable() { - @Override - protected void doRun() { - try { - shardOperationOnReplica(shardRequest); - state.onReplicaSuccess(); - } catch (Throwable e) { - state.onReplicaFailure(nodeId, e); - failReplicaIfNeeded(shard.index(), shard.id(), e); - } - } - - // we must never reject on because of thread pool capacity on replicas - @Override - public boolean isForceExecution() { - return true; - } - - @Override - public void onFailure(Throwable t) { - state.onReplicaFailure(nodeId, t); - } - }); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - state.onReplicaFailure(nodeId, e); - } - } else { - try { - shardOperationOnReplica(shardRequest); - state.onReplicaSuccess(); - } catch (Throwable e) { - failReplicaIfNeeded(shard.index(), shard.id(), e); - state.onReplicaFailure(nodeId, e); - } - } - } - } - - boolean raiseFailureIfHaveNotEnoughActiveShardCopies(ShardRouting shard, ClusterState state) { - if (!checkWriteConsistency) { - return false; + /** + * checks whether we can perform a write based on the write consistency setting + * returns **null* if OK to proceed, or a string describing the reason to stop + */ + String checkWriteConsistency(ShardRouting shard) { + if (checkWriteConsistency == false) { + return null; } final WriteConsistencyLevel consistencyLevel; @@ -697,11 +574,11 @@ public abstract class TransportShardReplicationOperationAction 2) { // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) requiredNumber = (shardRoutingTable.getSize() / 2) + 1; @@ -722,24 +599,21 @@ public abstract class TransportShardReplicationOperationAction listener; private final AtomicBoolean finished = new AtomicBoolean(false); private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard private final ConcurrentMap shardReplicaFailures = ConcurrentCollections.newConcurrentMap(); - + private final IndexMetaData indexMetaData; + private final ShardRouting originalPrimaryShard; private final AtomicInteger pending; - private final int numberOfShardInstances; + private final int totalShards; + private final ClusterStateObserver observer; - public ReplicationState(PrimaryOperationRequest por, ShardIterator shardsIter, Response finalResponse, ReplicaRequest replicaRequest, ActionListener listener, int numberOfPendingShardInstances, int numberOfUnassignedReplicas) { - this.request = por.request; - this.finalResponse = finalResponse; + /** + * the constructor doesn't take any action, just calculates state. Call {@link #run()} to start + * replicating. + */ + public ReplicationPhase(ShardIterator originalShardIt, ReplicaRequest replicaRequest, Response finalResponse, + ClusterStateObserver observer, ShardRouting originalPrimaryShard, + InternalRequest internalRequest, ActionListener listener) { this.replicaRequest = replicaRequest; - this.shardId = shardsIter.shardId(); this.listener = listener; - this.numberOfShardInstances = 1 + numberOfPendingShardInstances + numberOfUnassignedReplicas; + this.finalResponse = finalResponse; + this.originalPrimaryShard = originalPrimaryShard; + this.observer = observer; + indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex()); + + ShardRouting shard; + // we double check on the state, if it got changed we need to make sure we take the latest one cause + // maybe a replica shard started its recovery process and we need to apply it there... + + // we also need to make sure if the new state has a new primary shard (that we indexed to before) started + // and assigned to another node (while the indexing happened). In that case, we want to apply it on the + // new primary shard as well... + ClusterState newState = clusterService.state(); + + int numberOfUnassignedOrShadowReplicas = 0; + int numberOfPendingShardInstances = 0; + if (observer.observedState() != newState) { + observer.reset(newState); + shardIt = shards(newState, internalRequest); + while ((shard = shardIt.nextOrNull()) != null) { + if (shard.primary()) { + if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) { + // there is a new primary, we'll have to replicate to it. + numberOfPendingShardInstances++; + } + if (shard.relocating()) { + numberOfPendingShardInstances++; + } + } else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) { + // If the replicas use shadow replicas, there is no reason to + // perform the action on the replica, so skip it and + // immediately return + + // this delays mapping updates on replicas because they have + // to wait until they get the new mapping through the cluster + // state, which is why we recommend pre-defined mappings for + // indices using shadow replicas + numberOfUnassignedOrShadowReplicas++; + } else if (shard.unassigned()) { + numberOfUnassignedOrShadowReplicas++; + } else if (shard.relocating()) { + // we need to send to two copies + numberOfPendingShardInstances += 2; + } else { + numberOfPendingShardInstances++; + } + } + internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups + } else { + shardIt = originalShardIt; + shardIt.reset(); + while ((shard = shardIt.nextOrNull()) != null) { + if (shard.state() != ShardRoutingState.STARTED) { + replicaRequest.setCanHaveDuplicates(); + } + if (shard.unassigned()) { + numberOfUnassignedOrShadowReplicas++; + } else if (shard.primary()) { + if (shard.relocating()) { + // we have to replicate to the other copy + numberOfPendingShardInstances += 1; + } + } else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) { + // If the replicas use shadow replicas, there is no reason to + // perform the action on the replica, so skip it and + // immediately return + + // this delays mapping updates on replicas because they have + // to wait until they get the new mapping through the cluster + // state, which is why we recommend pre-defined mappings for + // indices using shadow replicas + numberOfUnassignedOrShadowReplicas++; + } else if (shard.relocating()) { + // we need to send to two copies + numberOfPendingShardInstances += 2; + } else { + numberOfPendingShardInstances++; + } + } + } + + // one for the primary already done + this.totalShards = 1 + numberOfPendingShardInstances + numberOfUnassignedOrShadowReplicas; this.pending = new AtomicInteger(numberOfPendingShardInstances); } - public Request request() { - return this.request; + /** total shard copies */ + int totalShards() { + return totalShards; } - public ReplicaRequest replicaRequest() { - return this.replicaRequest; + /** total successful operations so far */ + int successful() { + return success.get(); } - public void onReplicaFailure(String nodeId, @Nullable Throwable e) { + /** number of pending operations */ + int pending() { + return pending.get(); + } + + @Override + public void onFailure(Throwable t) { + logger.error("unexpected error while replicating for action [{}]. shard [{}]. ", t, actionName, shardIt.shardId()); + forceFinishAsFailed(t); + } + + /** start sending current requests to replicas */ + @Override + protected void doRun() { + if (pending.get() == 0) { + doFinish(); + return; + } + ShardRouting shard; + shardIt.reset(); // reset the iterator + while ((shard = shardIt.nextOrNull()) != null) { + // if its unassigned, nothing to do here... + if (shard.unassigned()) { + continue; + } + + // we index on a replica that is initializing as well since we might not have got the event + // yet that it was started. We will get an exception IllegalShardState exception if its not started + // and that's fine, we will ignore it + if (shard.primary()) { + if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId()) == false) { + // there is a new primary, we'll have to replicate to it. + performOnReplica(shard, shard.currentNodeId()); + } + if (shard.relocating()) { + performOnReplica(shard, shard.relocatingNodeId()); + } + } else if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings()) == false) { + performOnReplica(shard, shard.currentNodeId()); + if (shard.relocating()) { + performOnReplica(shard, shard.relocatingNodeId()); + } + } + } + } + + /** send operation to the given node or perform it if local */ + void performOnReplica(final ShardRouting shard, final String nodeId) { + // if we don't have that node, it means that it might have failed and will be created again, in + // this case, we don't have to do the operation, and just let it failover + if (!observer.observedState().nodes().nodeExists(nodeId)) { + onReplicaFailure(nodeId, null); + return; + } + + final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), replicaRequest); + + if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { + final DiscoveryNode node = observer.observedState().nodes().get(nodeId); + transportService.sendRequest(node, transportReplicaAction, shardRequest, + transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + @Override + public void handleResponse(TransportResponse.Empty vResponse) { + onReplicaSuccess(); + } + + @Override + public void handleException(TransportException exp) { + onReplicaFailure(nodeId, exp); + logger.trace("[{}] transport failure during replica request [{}] ", exp, node, replicaRequest); + if (ignoreReplicaException(exp) == false) { + logger.warn("failed to perform " + actionName + " on remote replica " + node + shardIt.shardId(), exp); + shardStateAction.shardFailed(shard, indexMetaData.getUUID(), + "Failed to perform [" + actionName + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]"); + } + } + + }); + } else { + if (replicaRequest.operationThreaded()) { + try { + threadPool.executor(executor).execute(new AbstractRunnable() { + @Override + protected void doRun() { + try { + shardOperationOnReplica(shardRequest); + onReplicaSuccess(); + } catch (Throwable e) { + onReplicaFailure(nodeId, e); + failReplicaIfNeeded(shard.index(), shard.id(), e); + } + } + + // we must never reject on because of thread pool capacity on replicas + @Override + public boolean isForceExecution() { + return true; + } + + @Override + public void onFailure(Throwable t) { + onReplicaFailure(nodeId, t); + } + }); + } catch (Throwable e) { + failReplicaIfNeeded(shard.index(), shard.id(), e); + onReplicaFailure(nodeId, e); + } + } else { + try { + shardOperationOnReplica(shardRequest); + onReplicaSuccess(); + } catch (Throwable e) { + failReplicaIfNeeded(shard.index(), shard.id(), e); + onReplicaFailure(nodeId, e); + } + } + } + } + + + void onReplicaFailure(String nodeId, @Nullable Throwable e) { // Only version conflict should be ignored from being put into the _shards header? - if (e != null && !ignoreReplicaException(e)) { + if (e != null && ignoreReplicaException(e) == false) { shardReplicaFailures.put(nodeId, e); } - finishIfNeeded(); + decPendingAndFinishIfNeeded(); } - public void onReplicaSuccess() { + void onReplicaSuccess() { success.incrementAndGet(); - finishIfNeeded(); + decPendingAndFinishIfNeeded(); } - public void forceFinish() { - doFinish(); - } - - private void finishIfNeeded() { - if (pending.decrementAndGet() == 0) { + private void decPendingAndFinishIfNeeded() { + if (pending.decrementAndGet() <= 0) { doFinish(); } } + private void forceFinishAsFailed(Throwable t) { + if (finished.compareAndSet(false, true)) { + listener.onFailure(t); + } + } + private void doFinish() { if (finished.compareAndSet(false, true)) { + final ShardId shardId = shardIt.shardId(); final ActionWriteResponse.ShardInfo.Failure[] failuresArray; if (!shardReplicaFailures.isEmpty()) { int slot = 0; @@ -824,9 +911,8 @@ public abstract class TransportShardReplicationOperationAction void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { + try { + listener.get(); + fail(msg); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), instanceOf(klass)); + } + + } + + @Test + public void testBlocks() throws ExecutionException, InterruptedException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + + ClusterBlocks.Builder block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + assertFalse("primary phase should stop execution", primaryPhase.checkBlocks()); + assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class); + + block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + listener = new PlainActionFuture<>(); + primaryPhase = action.new PrimaryPhase(new Request().timeout("5ms"), listener); + assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks()); + assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); + + + listener = new PlainActionFuture<>(); + primaryPhase = action.new PrimaryPhase(new Request(), listener); + assertFalse("primary phase should stop execution on retryable block", primaryPhase.checkBlocks()); + assertFalse("primary phase should wait on retryable block", listener.isDone()); + + block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); + assertListenerThrows("primary phase should fail operation when moving from a retryable block a non-retryable one", listener, ClusterBlockException.class); + } + + ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) { + int assignedReplicas = randomIntBetween(0, numberOfReplicas); + return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); + } + + ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) { + ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; + // no point in randomizing - node assignment later on does it too. + for (int i = 0; i < assignedReplicas; i++) { + replicaStates[i] = randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); + } + for (int i = assignedReplicas; i < replicaStates.length; i++) { + replicaStates[i] = ShardRoutingState.UNASSIGNED; + } + return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); + + } + + ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { + final int numberOfReplicas = replicaStates.length; + + int numberOfNodes = numberOfReplicas + 1; + if (primaryState == ShardRoutingState.RELOCATING) { + numberOfNodes++; + } + for (ShardRoutingState state : replicaStates) { + if (state == ShardRoutingState.RELOCATING) { + numberOfNodes++; + } + } + numberOfNodes = Math.max(2, numberOfNodes); // we need a non-local master to test shard failures + final ShardId shardId = new ShardId(index, 0); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + Set unassignedNodes = new HashSet<>(); + for (int i = 0; i < numberOfNodes + 1; i++) { + final DiscoveryNode node = newNode(i); + discoBuilder = discoBuilder.put(node); + unassignedNodes.add(node.id()); + } + discoBuilder.localNodeId(newNode(0).id()); + discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures + IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(ImmutableSettings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build(); + + RoutingTable.Builder routing = new RoutingTable.Builder(); + routing.addAsNew(indexMetaData); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId, false); + + String primaryNode = null; + String relocatingNode = null; + if (primaryState != ShardRoutingState.UNASSIGNED) { + if (primaryLocal) { + primaryNode = newNode(0).id(); + unassignedNodes.remove(primaryNode); + } else { + primaryNode = selectAndRemove(unassignedNodes); + } + if (primaryState == ShardRoutingState.RELOCATING) { + relocatingNode = selectAndRemove(unassignedNodes); + } + } + indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, 0, primaryNode, relocatingNode, true, primaryState, 0)); + + for (ShardRoutingState replicaState : replicaStates) { + String replicaNode = null; + relocatingNode = null; + if (replicaState != ShardRoutingState.UNASSIGNED) { + assert primaryNode != null : "a replica is assigned but the primary isn't"; + replicaNode = selectAndRemove(unassignedNodes); + if (replicaState == ShardRoutingState.RELOCATING) { + relocatingNode = selectAndRemove(unassignedNodes); + } + } + indexShardRoutingBuilder.addShard( + new ImmutableShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState, 0)); + + } + + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.nodes(discoBuilder); + state.metaData(MetaData.builder().put(indexMetaData, false).generateUuidIfNeeded()); + state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build()))); + return state.build(); + } + + private String selectAndRemove(Set strings) { + String selection = randomFrom(strings.toArray(new String[strings.size()])); + strings.remove(selection); + return selection; + } + + @Test + public void testNotStartedPrimary() throws InterruptedException, ExecutionException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + // no replicas in oder to skip the replication part + clusterService.setState(state(index, true, + randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED)); + + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + + Request request = new Request(shardId).timeout("1ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + primaryPhase.run(); + assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class); + + request = new Request(shardId); + listener = new PlainActionFuture<>(); + primaryPhase = action.new PrimaryPhase(request, listener); + primaryPhase.run(); + assertFalse("unassigned primary didn't cause a retry", listener.isDone()); + + clusterService.setState(state(index, true, ShardRoutingState.STARTED)); + logger.debug("--> primary assigned state:\n{}", clusterService.state().prettyPrint()); + + listener.get(); + assertTrue("request wasn't processed on primary, despite of it being assigned", request.processedOnPrimary.get()); + } + + @Test + public void testRoutingToPrimary() { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + + clusterService.setState(stateWithStartedPrimary(index, randomBoolean(), 3)); + + logger.debug("using state: \n{}", clusterService.state().prettyPrint()); + + final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); + final String primaryNodeId = shardRoutingTable.primaryShard().currentNodeId(); + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + + TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + assertTrue(primaryPhase.checkBlocks()); + primaryPhase.routeRequestOrPerformLocally(shardRoutingTable.primaryShard(), shardRoutingTable.shardsIt()); + if (primaryNodeId.equals(clusterService.localNode().id())) { + logger.info("--> primary is assigned locally, testing for execution"); + assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get()); + } else { + logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId); + final List capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId); + assertThat(capturedRequests, notNullValue()); + assertThat(capturedRequests.size(), equalTo(1)); + assertThat(capturedRequests.get(0).action, equalTo("testAction")); + } + } + + @Test + public void testWriteConsistency() { + action = new ActionWithConsistency(ImmutableSettings.EMPTY, "testActionWithConsistency", transportService, clusterService, threadPool); + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + final int assignedReplicas = randomInt(2); + final int unassignedReplicas = randomInt(2); + final int totalShards = 1 + assignedReplicas + unassignedReplicas; + final boolean passesWriteConsistency; + Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values())); + switch (request.consistencyLevel()) { + case ONE: + passesWriteConsistency = true; + break; + case DEFAULT: + case QUORUM: + if (totalShards <= 2) { + passesWriteConsistency = true; // primary is enough + } else { + passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1; + } + break; + case ALL: + passesWriteConsistency = unassignedReplicas == 0; + break; + default: + throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]"); + } + ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; + for (int i = 0; i < assignedReplicas; i++) { + replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); + } + for (int i = assignedReplicas; i < replicaStates.length; i++) { + replicaStates[i] = ShardRoutingState.UNASSIGNED; + } + + clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates)); + logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]. expecting op to [{}]. using state: \n{}", + request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry", + clusterService.state().prettyPrint()); + + final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); + PlainActionFuture listener = new PlainActionFuture<>(); + + TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + if (passesWriteConsistency) { + assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue()); + primaryPhase.run(); + assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get()); + } else { + assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue()); + primaryPhase.run(); + assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get()); + for (int i = 0; i < replicaStates.length; i++) { + replicaStates[i] = ShardRoutingState.STARTED; + } + clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates)); + assertTrue("once the consistency level met, operation should continue", request.processedOnPrimary.get()); + } + + } + + @Test + public void testReplication() throws ExecutionException, InterruptedException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); + int assignedReplicas = 0; + int totalShards = 0; + for (ShardRouting shard : shardRoutingTable) { + totalShards++; + if (shard.primary() == false && shard.assignedToNode()) { + assignedReplicas++; + } + if (shard.relocating()) { + assignedReplicas++; + totalShards++; + } + } + + runReplicateTest(shardRoutingTable, assignedReplicas, totalShards); + } + + @Test + public void testReplicationWithShadowIndex() throws ExecutionException, InterruptedException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + + ClusterState state = stateWithStartedPrimary(index, true, randomInt(5)); + MetaData.Builder metaData = MetaData.builder(state.metaData()); + ImmutableSettings.Builder settings = ImmutableSettings.builder().put(metaData.get(index).settings()); + settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true); + metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings)); + clusterService.setState(ClusterState.builder(state).metaData(metaData)); + + final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); + int assignedReplicas = 0; + int totalShards = 0; + for (ShardRouting shard : shardRoutingTable) { + totalShards++; + if (shard.primary() && shard.relocating()) { + assignedReplicas++; + totalShards++; + } + } + + runReplicateTest(shardRoutingTable, assignedReplicas, totalShards); + } + + + protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException { + final ShardRouting primaryShard = shardRoutingTable.primaryShard(); + final ShardIterator shardIt = shardRoutingTable.shardsIt(); + final ShardId shardId = shardIt.shardId(); + final Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + + logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); + + + final TransportShardReplicationOperationAction.InternalRequest internalRequest = action.new InternalRequest(request); + internalRequest.concreteIndex(shardId.index().name()); + TransportShardReplicationOperationAction.ReplicationPhase replicationPhase = + action.new ReplicationPhase(shardIt, request, + new Response(), new ClusterStateObserver(clusterService, logger), + primaryShard, internalRequest, listener); + + assertThat(replicationPhase.totalShards(), equalTo(totalShards)); + assertThat(replicationPhase.pending(), equalTo(assignedReplicas)); + replicationPhase.run(); + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); + assertThat(capturedRequests.length, equalTo(assignedReplicas)); + if (assignedReplicas > 0) { + assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false)); + } + int pending = replicationPhase.pending(); + int criticalFailures = 0; // failures that should fail the shard + int successfull = 1; + for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) { + if (randomBoolean()) { + Throwable t; + if (randomBoolean()) { + t = new CorruptIndexException("simulated", (String) null); + criticalFailures++; + } else { + t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); + } + logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName()); + transport.handleResponse(capturedRequest.requestId, t); + } else { + successfull++; + transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE); + } + pending--; + assertThat(replicationPhase.pending(), equalTo(pending)); + assertThat(replicationPhase.successful(), equalTo(successfull)); + } + assertThat(listener.isDone(), equalTo(true)); + Response response = listener.get(); + final ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); + assertThat(shardInfo.getFailed(), equalTo(criticalFailures)); + assertThat(shardInfo.getFailures(), arrayWithSize(criticalFailures)); + assertThat(shardInfo.getSuccessful(), equalTo(successfull)); + assertThat(shardInfo.getTotal(), equalTo(totalShards)); + + assertThat("failed to see enough shard failures", transport.capturedRequests().length, equalTo(criticalFailures)); + for (CapturingTransport.CapturedRequest capturedRequest : transport.capturedRequests()) { + assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME)); + } + } + + + static class Request extends ShardReplicationOperationRequest { + int shardId; + public AtomicBoolean processedOnPrimary = new AtomicBoolean(); + public AtomicInteger processedOnReplicas = new AtomicInteger(); + + Request() { + this.operationThreaded(false); + } + + Request(ShardId shardId) { + this(); + this.shardId = shardId.id(); + this.index(shardId.index().name()); + // keep things simple + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shardId); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = in.readVInt(); + } + } + + static class Response extends ActionWriteResponse { + + } + + static class Action extends TransportShardReplicationOperationAction { + + Action(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool) { + super(settings, actionName, transportService, clusterService, null, threadPool, + new ShardStateAction(settings, clusterService, transportService, null, null), + new ActionFilters(new HashSet())); + } + + @Override + protected Request newRequestInstance() { + return new Request(); + } + + @Override + protected Request newReplicaRequestInstance() { + return new Request(); + } + + @Override + protected Response newResponseInstance() { + return new Response(); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { + boolean executedBefore = shardRequest.request.processedOnPrimary.getAndSet(true); + assert executedBefore == false : "request has already been executed on the primary"; + return new Tuple<>(new Response(), shardRequest.request); + } + + @Override + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { + shardRequest.request.processedOnReplicas.incrementAndGet(); + } + + @Override + protected ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException { + return clusterState.getRoutingTable().index(request.concreteIndex()).shard(request.request().shardId).shardsIt(); + } + + @Override + protected boolean checkWriteConsistency() { + return false; + } + + @Override + protected boolean resolveIndex() { + return false; + } + } + + static class ActionWithConsistency extends Action { + + ActionWithConsistency(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { + super(settings, actionName, transportService, clusterService, threadPool); + } + + @Override + protected boolean checkWriteConsistency() { + return true; + } + } + + static DiscoveryNode newNode(int nodeId) { + return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); + } + + +} diff --git a/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java b/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java new file mode 100644 index 00000000000..27f09489763 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -0,0 +1,249 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.cluster; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.*; +import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.service.PendingClusterTask; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledFuture; + +/** a class that simulate simple cluster service features, like state storage and listeners */ +public class TestClusterService implements ClusterService { + + volatile ClusterState state; + private final Collection listeners = new CopyOnWriteArrayList<>(); + private final Queue onGoingTimeouts = ConcurrentCollections.newQueue(); + private final ThreadPool threadPool; + + public TestClusterService() { + this(ClusterState.builder(new ClusterName("test")).build()); + } + + public TestClusterService(ThreadPool threadPool) { + this(ClusterState.builder(new ClusterName("test")).build(), threadPool); + } + + public TestClusterService(ClusterState state) { + this(state, null); + } + + public TestClusterService(ClusterState state, @Nullable ThreadPool threadPool) { + if (state.getNodes().size() == 0) { + state = ClusterState.builder(state).nodes( + DiscoveryNodes.builder() + .put(new DiscoveryNode("test_id", DummyTransportAddress.INSTANCE, Version.CURRENT)) + .localNodeId("test_id")).build(); + } + + assert state.getNodes().localNode() != null; + this.state = state; + this.threadPool = threadPool; + + } + + + /** set the current state and trigger any registered listeners about the change */ + public void setState(ClusterState state) { + assert state.getNodes().localNode() != null; + // make sure we have a version increment + state = ClusterState.builder(state).version(this.state.version() + 1).build(); + ClusterChangedEvent event = new ClusterChangedEvent("test", state, this.state); + this.state = state; + for (ClusterStateListener listener : listeners) { + listener.clusterChanged(event); + } + } + + /** set the current state and trigger any registered listeners about the change */ + public void setState(ClusterState.Builder state) { + setState(state.build()); + } + + @Override + public DiscoveryNode localNode() { + return state.getNodes().localNode(); + } + + @Override + public ClusterState state() { + return state; + } + + @Override + public void addInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException { + throw new UnsupportedOperationException(); + + } + + @Override + public void removeInitialStateBlock(ClusterBlock block) throws ElasticsearchIllegalStateException { + throw new UnsupportedOperationException(); + + } + + @Override + public OperationRouting operationRouting() { + return null; + } + + @Override + public void addFirst(ClusterStateListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void addLast(ClusterStateListener listener) { + listeners.add(listener); + } + + @Override + public void add(ClusterStateListener listener) { + listeners.add(listener); + } + + @Override + public void remove(ClusterStateListener listener) { + listeners.remove(listener); + for (Iterator it = onGoingTimeouts.iterator(); it.hasNext(); ) { + NotifyTimeout timeout = it.next(); + if (timeout.listener.equals(listener)) { + timeout.cancel(); + it.remove(); + } + } + } + + @Override + public void add(LocalNodeMasterListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void remove(LocalNodeMasterListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void add(final TimeValue timeout, final TimeoutClusterStateListener listener) { + if (threadPool == null) { + throw new UnsupportedOperationException("TestClusterService wasn't initialized with a thread pool"); + } + NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); + notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout); + onGoingTimeouts.add(notifyTimeout); + listeners.add(listener); + listener.postAdded(); + } + + @Override + public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) { + throw new UnsupportedOperationException(); + } + + @Override + public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) { + throw new UnsupportedOperationException(); + } + + @Override + public List pendingTasks() { + throw new UnsupportedOperationException(); + + } + + @Override + public int numberOfPendingTasks() { + throw new UnsupportedOperationException(); + } + + @Override + public Lifecycle.State lifecycleState() { + throw new UnsupportedOperationException(); + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public ClusterService start() throws ElasticsearchException { + throw new UnsupportedOperationException(); + } + + @Override + public ClusterService stop() throws ElasticsearchException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws ElasticsearchException { + throw new UnsupportedOperationException(); + } + + class NotifyTimeout implements Runnable { + final TimeoutClusterStateListener listener; + final TimeValue timeout; + volatile ScheduledFuture future; + + NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) { + this.listener = listener; + this.timeout = timeout; + } + + public void cancel() { + FutureUtils.cancel(future); + } + + @Override + public void run() { + if (future != null && future.isCancelled()) { + return; + } + listener.onTimeout(this.timeout); + // note, we rely on the listener to remove itself in case of timeout if needed + } + } +} diff --git a/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java b/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java new file mode 100644 index 00000000000..ca4c7950345 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -0,0 +1,180 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.transport.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */ +public class CapturingTransport implements Transport { + private TransportServiceAdapter adapter; + + static public class CapturedRequest { + final public DiscoveryNode node; + final public long requestId; + final public String action; + final public TransportRequest request; + + public CapturedRequest(DiscoveryNode node, long requestId, String action, TransportRequest request) { + this.node = node; + this.requestId = requestId; + this.action = action; + this.request = request; + } + } + + private BlockingQueue capturedRequests = ConcurrentCollections.newBlockingQueue(); + + /** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */ + public CapturedRequest[] capturedRequests() { + return capturedRequests.toArray(new CapturedRequest[0]); + } + + /** + * returns all requests captured so far, grouped by target node. + * Doesn't clear the captured request list. See {@link #clear()} + */ + public Map> capturedRequestsByTargetNode() { + Map> map = new HashMap<>(); + for (CapturedRequest request : capturedRequests) { + List nodeList = map.get(request.node.id()); + if (nodeList == null) { + nodeList = new ArrayList<>(); + map.put(request.node.id(), nodeList); + } + nodeList.add(request); + } + return map; + } + + /** clears captured requests */ + public void clear() { + capturedRequests.clear(); + } + + /** simulate a response for the given requestId */ + public void handleResponse(final long requestId, final TransportResponse response) { + adapter.onResponseReceived(requestId).handleResponse(response); + } + + /** simulate a remote error for the given requesTId */ + public void handleResponse(final long requestId, final Throwable t) { + adapter.onResponseReceived(requestId).handleException(new RemoteTransportException("remote failure", t)); + } + + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + capturedRequests.add(new CapturedRequest(node, requestId, action, request)); + } + + + @Override + public void transportServiceAdapter(TransportServiceAdapter adapter) { + this.adapter = adapter; + } + + @Override + public BoundTransportAddress boundAddress() { + return null; + } + + @Override + public Map profileBoundAddresses() { + return null; + } + + @Override + public TransportAddress[] addressesFromString(String address) throws Exception { + return new TransportAddress[0]; + } + + @Override + public boolean addressSupported(Class address) { + return false; + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return true; + } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + + } + + @Override + public void disconnectFromNode(DiscoveryNode node) { + + } + + @Override + public long serverOpen() { + return 0; + } + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + + } + + @Override + public Transport start() throws ElasticsearchException { + return null; + } + + @Override + public Transport stop() throws ElasticsearchException { + return null; + } + + @Override + public void close() throws ElasticsearchException { + + } +}